diff options
author | Julian <almightyju@arandomworld.co.uk> | 2017-09-29 17:50:57 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-29 17:50:57 +0100 |
commit | 796e1b039f803c9e7d8785a4b54f2cb3c9be8528 (patch) | |
tree | a2136649cd41f8a31139e3e02ed124c090e9c456 | |
parent | cacc7747efaf924712a1738c2cf46dbfcdb6bc60 (diff) | |
parent | 9751b067748e6fa0f15741613d95eb4737adf75f (diff) | |
download | couchdb-796e1b039f803c9e7d8785a4b54f2cb3c9be8528.tar.gz |
Merge branch 'master' into master
103 files changed, 2035 insertions, 1029 deletions
diff --git a/.travis.yml b/.travis.yml index 56a2b7d71..8aebaabc6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -58,7 +58,7 @@ before_script: - cd ../.. script: - - make check mango-test + - make check after_failure: - build-aux/logfile-uploader.py @@ -91,6 +91,7 @@ fauxton: share/www check: all @$(MAKE) eunit @$(MAKE) javascript + @$(MAKE) mango-test # @$(MAKE) build-test diff --git a/README-DEV.rst b/README-DEV.rst index 73c684cd2..0f12fa44c 100644 --- a/README-DEV.rst +++ b/README-DEV.rst @@ -12,20 +12,29 @@ If you're unsure what this means, ignore this document. Dependencies ------------ -You may need: +You need the following to run tests: + +* `Python <https://www.python.org/>`_ +* `nose <https://nose.readthedocs.io/en/latest/>`_ +* `requests <http://docs.python-requests.org/>`_ +* `hypothesis <https://pypi.python.org/pypi/hypothesis>`_ + +You need the following optionally to build documentation: * `Sphinx <http://sphinx.pocoo.org/>`_ * `GNU help2man <http://www.gnu.org/software/help2man/>`_ * `GnuPG <http://www.gnupg.org/>`_ + +You need the following optionally to build releases: + * `md5sum <http://www.microbrew.org/tools/md5sha1sum/>`_ * `sha1sum <http://www.microbrew.org/tools/md5sha1sum/>`_ + +You need the following optionally to build Fauxton: + * `nodejs <http://nodejs.org/>`_ * `npm <https://www.npmjs.com/>`_ -The first four of these optional dependencies are required for building the -documentation. The next three are needed to build releases. The last two are for -needed to build fauxton. - You will need these optional dependencies installed if: * You are working on the documentation, or @@ -50,14 +59,16 @@ Debian-based (inc. Ubuntu) Systems :: - sudo apt-get install help2man python-sphinx gnupg nodejs npm + sudo apt-get install help2man python-sphinx gnupg nodejs npm \ + python-hypothesis python-requests python-nose Gentoo-based Systems ~~~~~~~~~~~~~~~~~~~~ :: - sudo emerge gnupg coreutils pkgconfig help2man sphinx + sudo emerge gnupg coreutils pkgconfig help2man sphinx python + sudo pip install hypothesis requests nose RedHat-based (Fedora, Centos, RHEL) Systems ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -65,7 +76,8 @@ RedHat-based (Fedora, Centos, RHEL) Systems :: sudo yum install help2man python-sphinx python-docutils \ - python-pygments gnupg nodejs npm + python-pygments gnupg nodejs npm python-nose python-requests \ + python-hypothesis Mac OS X ~~~~~~~~ @@ -85,9 +97,7 @@ If you don't already have pip installed, install it:: Now, install the required Python packages:: - sudo pip install sphinx - sudo pip install docutils - sudo pip install pygments + sudo pip install sphinx docutils pygments nose requests hypothesis FreeBSD ~~~~~~~ @@ -95,6 +105,7 @@ FreeBSD :: pkg install help2man gnupg py27-sphinx node + pip install nose requests hypothesis Windows ~~~~~~~ @@ -156,7 +156,7 @@ cat > rel/couchdb.config << EOF {log_file, "$LOG_FILE"}. {fauxton_root, "./share/www"}. {user, "$COUCHDB_USER"}. -{node_name, "-name couchdb@localhost"}. +{node_name, "-name couchdb@127.0.0.1"}. {cluster_port, 5984}. {backend_port, 5986}. EOF diff --git a/rebar.config.script b/rebar.config.script index 654fb2f12..61c34fd00 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -60,11 +60,11 @@ DepDescs = [ {fauxton, {url, "https://github.com/apache/couchdb-fauxton"}, {tag, "v1.1.13"}, [raw]}, %% Third party deps -{folsom, "folsom", {tag, "CouchDB-0.8.1"}}, +{folsom, "folsom", {tag, "CouchDB-0.8.2"}}, {ibrowse, "ibrowse", {tag, "CouchDB-4.0.1"}}, {jiffy, "jiffy", {tag, "CouchDB-0.14.11-1"}}, {mochiweb, "mochiweb", {tag, "CouchDB-2.12.0-1"}}, -{meck, "meck", {tag, "0.8.2"}} +{meck, "meck", {tag, "0.8.8"}} ], diff --git a/rel/overlay/bin/remsh b/rel/overlay/bin/remsh new file mode 100755 index 000000000..963c16a10 --- /dev/null +++ b/rel/overlay/bin/remsh @@ -0,0 +1,76 @@ +#!/bin/sh + +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +COUCHDB_BIN_DIR=$(cd "${0%/*}" && pwd) +ERTS_BIN_DIR=$COUCHDB_BIN_DIR/../ +ROOTDIR=${ERTS_BIN_DIR%/*} +START_ERL=$(cat "$ROOTDIR/releases/start_erl.data") +ERTS_VSN=${START_ERL% *} +APP_VSN=${START_ERL#* } +BINDIR=$ROOTDIR/erts-$ERTS_VSN/bin + +PROGNAME=${0##*/} +VERBOSE="" +NODE="couchdb@127.0.0.1" +COOKIE=monster +LHOST=127.0.0.1 + +printHelpAndExit() { + echo "Usage: ${PROGNAME} [OPTION]... [-- <additional Erlang cli options>]" + echo " -c cookie specify shared Erlang cookie (default: monster)" + echo " -l HOST specify remsh's host name (default: 127.0.0.1)" + echo " -m use output of \`hostname -f\` as remsh's host name" + echo " -n NAME@HOST specify couchdb's Erlang node name (-name in vm.args)" + echo " -v verbose; print invocation line" + echo " -h this help message" + exit +} + +while getopts ":hn:c:l:mv" optionName; do + case "$optionName" in + h) + printHelpAndExit 0 + ;; + n) + NODE=$OPTARG + ;; + c) + COOKIE=$OPTARG + ;; + l) + LHOST=$OPTARG + ;; + m) + LHOST=$(hostname -f) + ;; + v) + VERBOSE=0 + ;; + \?) + echo "Invalid option: -$OPTARG" >&2 + printHelpAndExit 0 + ;; + esac +done + +shift $((OPTIND - 1)) + +if [ ! -z "$VERBOSE" ]; then + # cheap but it works + set -x +fi + +exec "$BINDIR/erl" -boot "$ROOTDIR/releases/$APP_VSN/start_clean" \ + -name remsh$$@$LHOST -remsh $NODE -hidden -setcookie $COOKIE \ + "$@" diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 27a952c74..122853542 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -370,7 +370,7 @@ connection_timeout = 30000 ; Request timeout ;request_timeout = infinity ; If a request fails, the replicator will retry it up to N times. -retries_per_request = 10 +retries_per_request = 5 ; Use checkpoints ;use_checkpoints = true ; Checkpoint interval @@ -542,3 +542,7 @@ writer = stderr ; syslog_port = 514 ; syslog_appid = couchdb ; syslog_facility = local2 + +[stats] +; Stats collection interval in seconds. Default 10 seconds. +;interval = 10 diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index a7796fcdf..c8826d581 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -84,7 +84,7 @@ handle_changes_req1(#httpd{}=Req, Db) -> #changes_args{filter=Raw, style=Style} = Args0 = parse_changes_query(Req), ChangesArgs = Args0#changes_args{ filter_fun = couch_changes:configure_filter(Raw, Style, Req, Db), - db_open_options = [{user_ctx, Db#db.user_ctx}] + db_open_options = [{user_ctx, couch_db:get_user_ctx(Db)}] }, Max = chttpd:chunked_response_buffer_size(), case ChangesArgs#changes_args.feed of @@ -253,7 +253,7 @@ handle_view_cleanup_req(Req, Db) -> handle_design_req(#httpd{ path_parts=[_DbName, _Design, Name, <<"_",_/binary>> = Action | _Rest] }=Req, Db) -> - DbName = mem3:dbname(Db#db.name), + DbName = mem3:dbname(couch_db:name(Db)), case ddoc_cache:open(DbName, <<"_design/", Name/binary>>) of {ok, DDoc} -> Handler = chttpd_handlers:design_handler(Action, fun bad_action_req/3), @@ -309,7 +309,8 @@ delete_db_req(#httpd{}=Req, DbName) -> do_db_req(#httpd{path_parts=[DbName|_], user_ctx=Ctx}=Req, Fun) -> fabric:get_security(DbName, [{user_ctx,Ctx}]), % calls check_is_reader - Fun(Req, #db{name=DbName, user_ctx=Ctx}). + {ok, Db} = couch_db:clustered_db(DbName, Ctx), + Fun(Req, Db). db_req(#httpd{method='GET',path_parts=[DbName]}=Req, _Db) -> % measure the time required to generate the etag, see if it's worth it @@ -767,16 +768,17 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) -> } = parse_doc_query(Req), couch_doc:validate_docid(DocId), + DbName = couch_db:name(Db), W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))), Options = [{user_ctx,Ctx}, {w,W}], - Loc = absolute_uri(Req, [$/, couch_util:url_encode(Db#db.name), + Loc = absolute_uri(Req, [$/, couch_util:url_encode(DbName), $/, couch_util:url_encode(DocId)]), RespHeaders = [{"Location", Loc}], case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of ("multipart/related;" ++ _) = ContentType -> couch_httpd:check_max_request_length(Req), - couch_httpd_multipart:num_mp_writers(mem3:n(mem3:dbname(Db#db.name), DocId)), + couch_httpd_multipart:num_mp_writers(mem3:n(mem3:dbname(DbName), DocId)), {ok, Doc0, WaitFun, Parser} = couch_doc:doc_from_multi_part_stream(ContentType, fun() -> receive_request_data(Req) end), Doc = couch_doc_from_req(Req, DocId, Doc0), @@ -833,8 +835,9 @@ db_doc_req(#httpd{method='COPY', user_ctx=Ctx}=Req, Db, SourceDocId) -> HttpCode = 202 end, % respond + DbName = couch_db:name(Db), {PartRes} = update_doc_result_to_json(TargetDocId, {ok, NewTargetRev}), - Loc = absolute_uri(Req, "/" ++ couch_util:url_encode(Db#db.name) ++ "/" ++ couch_util:url_encode(TargetDocId)), + Loc = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName) ++ "/" ++ couch_util:url_encode(TargetDocId)), send_json(Req, HttpCode, [{"Location", Loc}, {"ETag", "\"" ++ ?b2l(couch_doc:rev_to_str(NewTargetRev)) ++ "\""}], @@ -1057,8 +1060,8 @@ couch_doc_from_req(Req, DocId, Json) -> % couch_doc_open(Db, DocId) -> % couch_doc_open(Db, DocId, nil, []). -couch_doc_open(#db{} = Db, DocId, Rev, Options0) -> - Options = [{user_ctx, Db#db.user_ctx} | Options0], +couch_doc_open(Db, DocId, Rev, Options0) -> + Options = [{user_ctx, couch_db:get_user_ctx(Db)} | Options0], case Rev of nil -> % open most recent rev case fabric:open_doc(Db, DocId, Options) of @@ -1262,7 +1265,7 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa HttpCode = 202 end, erlang:put(mochiweb_request_recv, true), - #db{name=DbName} = Db, + DbName = couch_db:name(Db), {Status, Headers} = case Method of 'DELETE' -> diff --git a/src/chttpd/src/chttpd_external.erl b/src/chttpd/src/chttpd_external.erl index 4abeecb37..64664b98e 100644 --- a/src/chttpd/src/chttpd_external.erl +++ b/src/chttpd/src/chttpd_external.erl @@ -120,16 +120,22 @@ json_req_obj_field(<<"secObj">>, #httpd{user_ctx=UserCtx}, Db, _DocId) -> get_db_security(Db, UserCtx). -get_db_info(#db{main_pid = nil} = Db) -> - fabric:get_db_info(Db); -get_db_info(#db{} = Db) -> - couch_db:get_db_info(Db). +get_db_info(Db) -> + case couch_db:is_clustered(Db) of + true -> + fabric:get_db_info(Db); + false -> + couch_db:get_db_info(Db) + end. -get_db_security(#db{main_pid = nil}=Db, #user_ctx{}) -> - fabric:get_security(Db); -get_db_security(#db{}=Db, #user_ctx{}) -> - couch_db:get_security(Db). +get_db_security(Db, #user_ctx{}) -> + case couch_db:is_clustered(Db) of + true -> + fabric:get_security(Db); + false -> + couch_db:get_security(Db) + end. to_json_terms(Data) -> diff --git a/src/chttpd/src/chttpd_show.erl b/src/chttpd/src/chttpd_show.erl index 48f14257e..c6d232c96 100644 --- a/src/chttpd/src/chttpd_show.erl +++ b/src/chttpd/src/chttpd_show.erl @@ -199,7 +199,8 @@ handle_view_list_req(Req, _Db, _DDoc) -> handle_view_list(Req, Db, DDoc, LName, {ViewDesignName, ViewName}, Keys) -> %% Will throw an exception if the _list handler is missing couch_util:get_nested_json_value(DDoc#doc.body, [<<"lists">>, LName]), - {ok, VDoc} = ddoc_cache:open(Db#db.name, <<"_design/", ViewDesignName/binary>>), + DbName = couch_db:name(Db), + {ok, VDoc} = ddoc_cache:open(DbName, <<"_design/", ViewDesignName/binary>>), CB = fun couch_mrview_show:list_cb/2, QueryArgs = couch_mrview_http:parse_params(Req, Keys), Options = [{user_ctx, Req#httpd.user_ctx}], diff --git a/src/couch/include/couch_db.hrl b/src/couch/include/couch_db.hrl index 7049c6e5f..37f5792c3 100644 --- a/src/couch/include/couch_db.hrl +++ b/src/couch/include/couch_db.hrl @@ -30,12 +30,7 @@ -define(i2b(V), couch_util:integer_to_boolean(V)). -define(b2i(V), couch_util:boolean_to_integer(V)). -define(term_to_bin(T), term_to_binary(T, [{minor_version, 1}])). --define(term_size(T), - try - erlang:external_size(T) - catch _:_ -> - byte_size(?term_to_bin(T)) - end). +-define(term_size(T), erlang:external_size(T, [{minor_version, 1}])). -define(DEFAULT_ATTACHMENT_CONTENT_TYPE, <<"application/octet-stream">>). @@ -129,33 +124,6 @@ handler }). --record(db, { - main_pid = nil, - compactor_pid = nil, - instance_start_time, % number of microsecs since jan 1 1970 as a binary string - fd, - fd_monitor, - header = couch_db_header:new(), - committed_update_seq, - id_tree, - seq_tree, - local_tree, - update_seq, - name, - filepath, - validate_doc_funs = undefined, - security = [], - security_ptr = nil, - user_ctx = #user_ctx{}, - waiting_delayed_commit = nil, - revs_limit = 1000, - fsync_options = [], - options = [], - compression, - before_doc_update = nil, % nil | fun(Doc, Db) -> NewDoc - after_doc_read = nil % nil | fun(Doc, Db) -> NewDoc -}). - -record(view_fold_helper_funs, { reduce_count, passed_end, diff --git a/src/couch/src/couch_att.erl b/src/couch/src/couch_att.erl index 5c040a8c4..3380f5739 100644 --- a/src/couch/src/couch_att.erl +++ b/src/couch/src/couch_att.erl @@ -494,6 +494,9 @@ flush(Fd, Att) -> flush_data(Fd, fetch(data, Att), Att). +flush_data(Fd, {stream, {couch_bt_engine_stream, {OtherFd, StreamPointer}}}, + Att) -> + flush_data(Fd, {OtherFd, StreamPointer}, Att); flush_data(Fd, {Fd0, _}, Att) when Fd0 == Fd -> % already written to our file, nothing to write Att; diff --git a/src/couch/src/couch_auth_cache.erl b/src/couch/src/couch_auth_cache.erl index 1c4b86651..16c59d19a 100644 --- a/src/couch/src/couch_auth_cache.erl +++ b/src/couch/src/couch_auth_cache.erl @@ -322,13 +322,15 @@ refresh_entries(AuthDb) -> nil -> ok; AuthDb2 -> - case AuthDb2#db.update_seq > AuthDb#db.update_seq of + AuthDbSeq = couch_db:get_update_seq(AuthDb), + AuthDb2Seq = couch_db:get_update_seq(AuthDb2), + case AuthDb2Seq > AuthDbSeq of true -> {ok, _, _} = couch_db:enum_docs_since( AuthDb2, - AuthDb#db.update_seq, + AuthDbSeq, fun(DocInfo, _, _) -> refresh_entry(AuthDb2, DocInfo) end, - AuthDb#db.update_seq, + AuthDbSeq, [] ), true = ets:insert(?STATE, {auth_db, AuthDb2}); @@ -386,7 +388,9 @@ cache_needs_refresh() -> nil -> false; AuthDb2 -> - AuthDb2#db.update_seq > AuthDb#db.update_seq + AuthDbSeq = couch_db:get_update_seq(AuthDb), + AuthDb2Seq = couch_db:get_update_seq(AuthDb2), + AuthDb2Seq > AuthDbSeq end end, false @@ -407,7 +411,7 @@ exec_if_auth_db(Fun) -> exec_if_auth_db(Fun, DefRes) -> case ets:lookup(?STATE, auth_db) of - [{auth_db, #db{} = AuthDb}] -> + [{auth_db, AuthDb}] -> Fun(AuthDb); _ -> DefRes diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl index 26d272a88..3e4175014 100644 --- a/src/couch/src/couch_changes.erl +++ b/src/couch/src/couch_changes.erl @@ -79,9 +79,10 @@ handle_changes(Args1, Req, Db0, Type) -> _ -> {false, undefined, undefined} end, + DbName = couch_db:name(Db0), {StartListenerFun, View} = if UseViewChanges -> {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view( - Db0#db.name, DDocName, ViewName, #mrargs{}), + DbName, DDocName, ViewName, #mrargs{}), case View0#mrview.seq_btree of #btree{} -> ok; @@ -90,14 +91,14 @@ handle_changes(Args1, Req, Db0, Type) -> end, SNFun = fun() -> couch_event:link_listener( - ?MODULE, handle_view_event, {self(), DDocName}, [{dbname, Db0#db.name}] + ?MODULE, handle_view_event, {self(), DDocName}, [{dbname, DbName}] ) end, {SNFun, View0}; true -> SNFun = fun() -> couch_event:link_listener( - ?MODULE, handle_db_event, self(), [{dbname, Db0#db.name}] + ?MODULE, handle_db_event, self(), [{dbname, DbName}] ) end, {SNFun, undefined} @@ -112,7 +113,7 @@ handle_changes(Args1, Req, Db0, Type) -> end, View2 = if UseViewChanges -> {ok, {_, View1, _}, _, _} = couch_mrview_util:get_view( - Db0#db.name, DDocName, ViewName, #mrargs{}), + DbName, DDocName, ViewName, #mrargs{}), View1; true -> undefined @@ -220,11 +221,11 @@ configure_filter("_view", Style, Req, Db) -> catch _:_ -> view end, - case Db#db.id_tree of - undefined -> + case couch_db:is_clustered(Db) of + true -> DIR = fabric_util:doc_id_and_rev(DDoc), {fetch, FilterType, Style, DIR, VName}; - _ -> + false -> {FilterType, Style, DDoc, VName} end; [] -> @@ -243,11 +244,11 @@ configure_filter(FilterName, Style, Req, Db) -> [DName, FName] -> {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>), check_member_exists(DDoc, [<<"filters">>, FName]), - case Db#db.id_tree of - undefined -> + case couch_db:is_clustered(Db) of + true -> DIR = fabric_util:doc_id_and_rev(DDoc), {fetch, custom, Style, Req, DIR, FName}; - _ -> + false-> {custom, Style, Req, DDoc, FName} end; @@ -396,15 +397,19 @@ check_fields(_Fields) -> throw({bad_request, "Selector error: fields must be JSON array"}). -open_ddoc(#db{name=DbName, id_tree=undefined}, DDocId) -> - case ddoc_cache:open_doc(mem3:dbname(DbName), DDocId) of - {ok, _} = Resp -> Resp; - Else -> throw(Else) - end; open_ddoc(Db, DDocId) -> - case couch_db:open_doc(Db, DDocId, [ejson_body]) of - {ok, _} = Resp -> Resp; - Else -> throw(Else) + DbName = couch_db:name(Db), + case couch_db:is_clustered(Db) of + true -> + case ddoc_cache:open_doc(mem3:dbname(DbName), DDocId) of + {ok, _} = Resp -> Resp; + Else -> throw(Else) + end; + false -> + case couch_db:open_doc(Db, DDocId, [ejson_body]) of + {ok, _} = Resp -> Resp; + Else -> throw(Else) + end end. @@ -572,7 +577,7 @@ can_optimize(_, _) -> send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) -> - Lookups = couch_btree:lookup(Db#db.id_tree, DocIds), + Lookups = couch_db:get_full_doc_infos(Db, DocIds), FullInfos = lists:foldl(fun ({ok, FDI}, Acc) -> [FDI | Acc]; (not_found, Acc) -> Acc @@ -581,11 +586,9 @@ send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) -> send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) -> - FoldFun = fun(FullDocInfo, _, Acc) -> - {ok, [FullDocInfo | Acc]} - end, + FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end, KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}], - {ok, _, FullInfos} = couch_btree:fold(Db#db.id_tree, FoldFun, [], KeyOpts), + {ok, FullInfos} = couch_db:fold_docs(Db, FoldFun, [], KeyOpts), send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0). @@ -646,8 +649,8 @@ keep_sending_changes(Args, Acc0, FirstRound) -> true -> case wait_updated(Timeout, TimeoutFun, UserAcc2) of {updated, UserAcc4} -> - DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions], - case couch_db:open(Db#db.name, DbOptions1) of + DbOptions1 = [{user_ctx, couch_db:get_user_ctx(Db)} | DbOptions], + case couch_db:open(couch_db:name(Db), DbOptions1) of {ok, Db2} -> ?MODULE:keep_sending_changes( Args#changes_args{limit=NewLimit}, @@ -671,7 +674,8 @@ keep_sending_changes(Args, Acc0, FirstRound) -> maybe_refresh_view(_, undefined, undefined) -> undefined; maybe_refresh_view(Db, DDocName, ViewName) -> - {ok, {_, View, _}, _, _} = couch_mrview_util:get_view(Db#db.name, DDocName, ViewName, #mrargs{}), + DbName = couch_db:name(Db), + {ok, {_, View, _}, _, _} = couch_mrview_util:get_view(DbName, DDocName, ViewName, #mrargs{}), View. end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) -> diff --git a/src/couch/src/couch_compaction_daemon.erl b/src/couch/src/couch_compaction_daemon.erl index 024b867d0..38e185da8 100644 --- a/src/couch/src/couch_compaction_daemon.erl +++ b/src/couch/src/couch_compaction_daemon.erl @@ -319,7 +319,7 @@ can_db_compact(#config{db_frag = Threshold} = Config, Db) -> {Frag, SpaceRequired} = frag(DbInfo), couch_log:debug("Fragmentation for database `~s` is ~p%, estimated" " space for compaction is ~p bytes.", - [Db#db.name, Frag, SpaceRequired]), + [couch_db:name(Db), Frag, SpaceRequired]), case check_frag(Threshold, Frag) of false -> false; @@ -332,7 +332,7 @@ can_db_compact(#config{db_frag = Threshold} = Config, Db) -> couch_log:warning("Compaction daemon - skipping database `~s` " "compaction: the estimated necessary disk space is about ~p" " bytes but the currently available disk space is ~p bytes.", - [Db#db.name, SpaceRequired, Free]), + [couch_db:name(Db), SpaceRequired, Free]), false end end diff --git a/src/couch/src/couch_compress.erl b/src/couch/src/couch_compress.erl index 71588b228..cfcc2a481 100644 --- a/src/couch/src/couch_compress.erl +++ b/src/couch/src/couch_compress.erl @@ -14,6 +14,7 @@ -export([compress/2, decompress/1, is_compressed/2]). -export([get_compression_method/0]). +-export([uncompressed_size/1]). -include_lib("couch/include/couch_db.hrl"). @@ -83,3 +84,16 @@ is_compressed(Term, _Method) when not is_binary(Term) -> is_compressed(_, _) -> error(invalid_compression). + +uncompressed_size(<<?SNAPPY_PREFIX, Rest/binary>>) -> + {ok, Size} = snappy:uncompressed_length(Rest), + Size; +uncompressed_size(<<?COMPRESSED_TERM_PREFIX, Size:32, _/binary>> = _Bin) -> + % See http://erlang.org/doc/apps/erts/erl_ext_dist.html + % The uncompressed binary would be encoded with <<131, Rest/binary>> + % so need to add 1 for 131 + Size + 1; +uncompressed_size(<<?TERM_PREFIX, _/binary>> = Bin) -> + byte_size(Bin); +uncompressed_size(_) -> + error(invalid_compression). diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index 7a1afa750..5e720c284 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -12,32 +12,118 @@ -module(couch_db). --export([open/2,open_int/2,close/1,create/2,get_db_info/1,get_design_docs/1]). --export([start_compact/1, cancel_compact/1]). --export([wait_for_compaction/1, wait_for_compaction/2]). --export([is_idle/1,monitor/1,count_changes_since/2]). --export([update_doc/3,update_doc/4,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]). --export([get_doc_info/2,get_full_doc_info/2,get_full_doc_infos/2]). --export([open_doc/2,open_doc/3,open_doc_revs/4]). --export([set_revs_limit/2,get_revs_limit/1]). --export([get_missing_revs/2,name/1,get_update_seq/1,get_committed_update_seq/1]). --export([get_uuid/1, get_epochs/1, get_compacted_seq/1]). --export([enum_docs/4,enum_docs_since/5]). --export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). --export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). --export([start_link/3,open_doc_int/3,ensure_full_commit/1,ensure_full_commit/2]). --export([set_security/2,get_security/1]). --export([changes_since/4,changes_since/5,read_doc/2,new_revid/1]). --export([check_is_admin/1, is_admin/1, check_is_member/1, get_doc_count/1]). --export([reopen/1, is_system_db/1, compression/1, make_doc/5]). --export([load_validation_funs/1]). --export([check_md5/2, with_stream/3]). --export([monitored_by/1]). --export([normalize_dbname/1]). --export([validate_dbname/1]). --export([dbname_suffix/1]). +-export([ + create/2, + open/2, + open_int/2, + incref/1, + reopen/1, + close/1, + + clustered_db/2, + clustered_db/3, + + monitor/1, + monitored_by/1, + is_idle/1, + + is_admin/1, + check_is_admin/1, + check_is_member/1, + + name/1, + compression/1, + get_after_doc_read_fun/1, + get_before_doc_update_fun/1, + get_committed_update_seq/1, + get_compacted_seq/1, + get_compactor_pid/1, + get_db_info/1, + get_doc_count/1, + get_epochs/1, + get_filepath/1, + get_instance_start_time/1, + get_last_purged/1, + get_pid/1, + get_revs_limit/1, + get_security/1, + get_update_seq/1, + get_user_ctx/1, + get_uuid/1, + get_purge_seq/1, + + is_db/1, + is_system_db/1, + is_clustered/1, + + increment_update_seq/1, + set_revs_limit/2, + set_security/2, + set_user_ctx/2, + + ensure_full_commit/1, + ensure_full_commit/2, + + load_validation_funs/1, + + open_doc/2, + open_doc/3, + open_doc_revs/4, + open_doc_int/3, + read_doc/2, + get_doc_info/2, + get_full_doc_info/2, + get_full_doc_infos/2, + get_missing_revs/2, + get_design_docs/1, + + update_doc/3, + update_doc/4, + update_docs/4, + update_docs/2, + update_docs/3, + delete_doc/3, + + purge_docs/2, + + with_stream/3, + + fold_docs/4, + fold_local_docs/4, + enum_docs/4, + enum_docs_reduce_to_count/1, + + enum_docs_since/5, + enum_docs_since_reduce_to_count/1, + changes_since/4, + changes_since/5, + count_changes_since/2, + + calculate_start_seq/3, + owner_of/2, + + start_compact/1, + cancel_compact/1, + wait_for_compaction/1, + wait_for_compaction/2, + + dbname_suffix/1, + normalize_dbname/1, + validate_dbname/1, + + check_md5/2, + make_doc/5, + new_revid/1 +]). + + +-export([ + start_link/3 +]). + -include_lib("couch/include/couch_db.hrl"). +-include("couch_db_int.hrl"). -define(DBNAME_REGEX, "^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*" % use the stock CouchDB regex @@ -112,9 +198,31 @@ reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) -> {ok, NewDb#db{user_ctx = UserCtx, fd_monitor = NewRef}} end. +incref(#db{fd = Fd} = Db) -> + Ref = erlang:monitor(process, Fd), + {ok, Db#db{fd_monitor = Ref}}. + +clustered_db(DbName, UserCtx) -> + clustered_db(DbName, UserCtx, []). + +clustered_db(DbName, UserCtx, SecProps) -> + {ok, #db{name = DbName, user_ctx = UserCtx, security = SecProps}}. + +is_db(#db{}) -> + true; +is_db(_) -> + false. + is_system_db(#db{options = Options}) -> lists:member(sys_db, Options). +is_clustered(#db{main_pid = nil}) -> + true; +is_clustered(#db{}) -> + false; +is_clustered(?NEW_PSE_DB = Db) -> + ?PSE_DB_MAIN_PID(Db) == undefined. + ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) -> ok = gen_server:call(Pid, full_commit, infinity), {ok, StartTime}. @@ -126,6 +234,8 @@ ensure_full_commit(Db, RequiredSeq) -> close(#db{fd_monitor=Ref}) -> erlang:demonitor(Ref, [flush]), + ok; +close(?NEW_PSE_DB) -> ok. is_idle(#db{compactor_pid=nil, waiting_delayed_commit=nil} = Db) -> @@ -295,12 +405,23 @@ increment_update_seq(#db{main_pid=Pid}) -> purge_docs(#db{main_pid=Pid}, IdsRevs) -> gen_server:call(Pid, {purge_docs, IdsRevs}). +get_after_doc_read_fun(#db{after_doc_read = Fun}) -> + Fun. + +get_before_doc_update_fun(#db{before_doc_update = Fun}) -> + Fun. + get_committed_update_seq(#db{committed_update_seq=Seq}) -> Seq. get_update_seq(#db{update_seq=Seq})-> Seq. +get_user_ctx(#db{user_ctx = UserCtx}) -> + UserCtx; +get_user_ctx(?NEW_PSE_DB = Db) -> + ?PSE_DB_USER_CTX(Db). + get_purge_seq(#db{}=Db) -> couch_db_header:purge_seq(Db#db.header). @@ -312,19 +433,33 @@ get_last_purged(#db{}=Db) -> couch_file:pread_term(Db#db.fd, Pointer) end. +get_pid(#db{main_pid = Pid}) -> + Pid. + get_doc_count(Db) -> - {ok, {Count, _, _}} = couch_btree:full_reduce(Db#db.id_tree), - {ok, Count}. + {ok, Reds} = couch_btree:full_reduce(Db#db.id_tree), + {ok, element(1, Reds)}. get_uuid(#db{}=Db) -> couch_db_header:uuid(Db#db.header). get_epochs(#db{}=Db) -> - couch_db_header:epochs(Db#db.header). + Epochs = couch_db_header:epochs(Db#db.header), + validate_epochs(Epochs), + Epochs. + +get_filepath(#db{filepath = FilePath}) -> + FilePath. + +get_instance_start_time(#db{instance_start_time = IST}) -> + IST. get_compacted_seq(#db{}=Db) -> couch_db_header:compacted_seq(Db#db.header). +get_compactor_pid(#db{compactor_pid = Pid}) -> + Pid. + get_db_info(Db) -> #db{fd=Fd, header=Header, @@ -503,7 +638,9 @@ get_members(#db{security=SecProps}) -> couch_util:get_value(<<"readers">>, SecProps, {[]})). get_security(#db{security=SecProps}) -> - {SecProps}. + {SecProps}; +get_security(?NEW_PSE_DB = Db) -> + {?PSE_DB_SECURITY(Db)}. set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) -> check_is_admin(Db), @@ -514,6 +651,9 @@ set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) -> set_security(_, _) -> throw(bad_request). +set_user_ctx(#db{} = Db, UserCtx) -> + {ok, Db#db{user_ctx = UserCtx}}. + validate_security_object(SecProps) -> Admins = couch_util:get_value(<<"admins">>, SecProps, {[]}), % we fallback to readers here for backwards compatibility @@ -549,7 +689,9 @@ set_revs_limit(_Db, _Limit) -> throw(invalid_revs_limit). name(#db{name=Name}) -> - Name. + Name; +name(?NEW_PSE_DB = Db) -> + ?PSE_DB_NAME(Db). compression(#db{compression=Compression}) -> Compression. @@ -1275,6 +1417,17 @@ enum_docs_since(Db, SinceSeq, InFun, Acc, Options) -> [{start_key, SinceSeq + 1} | Options]), {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}. + +fold_docs(Db, InFun, InAcc, Opts) -> + Wrapper = fun(FDI, _, Acc) -> InFun(FDI, Acc) end, + {ok, _, AccOut} = couch_btree:fold(Db#db.id_tree, Wrapper, InAcc, Opts), + {ok, AccOut}. + +fold_local_docs(Db, InFun, InAcc, Opts) -> + Wrapper = fun(FDI, _, Acc) -> InFun(FDI, Acc) end, + {ok, _, AccOut} = couch_btree:fold(Db#db.local_tree, Wrapper, InAcc, Opts), + {ok, AccOut}. + enum_docs(Db, InFun, InAcc, Options0) -> {NS, Options} = extract_namespace(Options0), enum_docs(Db, NS, InFun, InAcc, Options). @@ -1298,6 +1451,78 @@ enum_docs(Db, NS, InFun, InAcc, Options0) -> Db#db.id_tree, FoldFun, InAcc, Options), {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}. + +calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) -> + Seq; +calculate_start_seq(Db, Node, {Seq, Uuid}) -> + % Treat the current node as the epoch node + calculate_start_seq(Db, Node, {Seq, Uuid, Node}); +calculate_start_seq(Db, _Node, {Seq, Uuid, EpochNode}) -> + case is_prefix(Uuid, get_uuid(Db)) of + true -> + case is_owner(EpochNode, Seq, get_epochs(Db)) of + true -> Seq; + false -> 0 + end; + false -> + %% The file was rebuilt, most likely in a different + %% order, so rewind. + 0 + end; +calculate_start_seq(Db, _Node, {replace, OriginalNode, Uuid, Seq}) -> + case is_prefix(Uuid, couch_db:get_uuid(Db)) of + true -> + start_seq(get_epochs(Db), OriginalNode, Seq); + false -> + {replace, OriginalNode, Uuid, Seq} + end. + + +validate_epochs(Epochs) -> + %% Assert uniqueness. + case length(Epochs) == length(lists:ukeysort(2, Epochs)) of + true -> ok; + false -> erlang:error(duplicate_epoch) + end, + %% Assert order. + case Epochs == lists:sort(fun({_, A}, {_, B}) -> B =< A end, Epochs) of + true -> ok; + false -> erlang:error(epoch_order) + end. + + +is_prefix(Pattern, Subject) -> + binary:longest_common_prefix([Pattern, Subject]) == size(Pattern). + + +is_owner(Node, Seq, Epochs) -> + Node =:= owner_of(Epochs, Seq). + + +owner_of(Db, Seq) when not is_list(Db) -> + owner_of(get_epochs(Db), Seq); +owner_of([], _Seq) -> + undefined; +owner_of([{EpochNode, EpochSeq} | _Rest], Seq) when Seq > EpochSeq -> + EpochNode; +owner_of([_ | Rest], Seq) -> + owner_of(Rest, Seq). + + +start_seq([{OrigNode, EpochSeq} | _], OrigNode, Seq) when Seq > EpochSeq -> + %% OrigNode is the owner of the Seq so we can safely stream from there + Seq; +start_seq([{_, NewSeq}, {OrigNode, _} | _], OrigNode, Seq) when Seq > NewSeq -> + %% We transferred this file before Seq was written on OrigNode, so we need + %% to stream from the beginning of the next epoch. Note that it is _not_ + %% necessary for the current node to own the epoch beginning at NewSeq + NewSeq; +start_seq([_ | Rest], OrigNode, Seq) -> + start_seq(Rest, OrigNode, Seq); +start_seq([], OrigNode, Seq) -> + erlang:error({epoch_mismatch, OrigNode, Seq}). + + extract_namespace(Options0) -> case proplists:split(Options0, [namespace]) of {[[{namespace, NS}]], Options} -> @@ -1636,6 +1861,30 @@ should_fail_validate_dbname(DbName) -> ok end)}. +calculate_start_seq_test() -> + %% uuid mismatch is always a rewind. + Hdr1 = couch_db_header:new(), + Hdr2 = couch_db_header:set(Hdr1, [{epochs, [{node1, 1}]}, {uuid, <<"uuid1">>}]), + ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, node1, {1, <<"uuid2">>})), + %% uuid matches and seq is owned by node. + Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]), + ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, node1, {2, <<"uuid1">>})), + %% uuids match but seq is not owned by node. + Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]), + ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, node1, {3, <<"uuid1">>})), + %% return integer if we didn't get a vector. + ?assertEqual(4, calculate_start_seq(#db{}, foo, 4)). + +is_owner_test() -> + ?assertNot(is_owner(foo, 1, [])), + ?assertNot(is_owner(foo, 1, [{foo, 1}])), + ?assert(is_owner(foo, 2, [{foo, 1}])), + ?assert(is_owner(foo, 50, [{bar, 100}, {foo, 1}])), + ?assert(is_owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])), + ?assert(is_owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])), + ?assertError(duplicate_epoch, validate_epochs([{foo, 1}, {bar, 1}])), + ?assertError(epoch_order, validate_epochs([{foo, 100}, {bar, 200}])). + to_binary(DbName) when is_list(DbName) -> ?l2b(DbName); to_binary(DbName) when is_binary(DbName) -> diff --git a/src/couch/src/couch_db_int.hrl b/src/couch/src/couch_db_int.hrl new file mode 100644 index 000000000..da1e45d75 --- /dev/null +++ b/src/couch/src/couch_db_int.hrl @@ -0,0 +1,93 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-record(db, { + main_pid = nil, + compactor_pid = nil, + instance_start_time, % number of microsecs since jan 1 1970 as a binary string + fd, + fd_monitor, + header = couch_db_header:new(), + committed_update_seq, + id_tree, + seq_tree, + local_tree, + update_seq, + name, + filepath, + validate_doc_funs = undefined, + security = [], + security_ptr = nil, + user_ctx = #user_ctx{}, + waiting_delayed_commit = nil, + revs_limit = 1000, + fsync_options = [], + options = [], + compression, + before_doc_update = nil, % nil | fun(Doc, Db) -> NewDoc + after_doc_read = nil % nil | fun(Doc, Db) -> NewDoc +}). + + +-record(new_pse_db, { + vsn, + name, + filepath, + + engine = {couch_bt_engine, undefined}, + + main_pid = nil, + compactor_pid = nil, + + committed_update_seq, + + instance_start_time, % number of microsecs since jan 1 1970 as a binary string + + user_ctx = #user_ctx{}, + security = [], + validate_doc_funs = undefined, + + before_doc_update = nil, % nil | fun(Doc, Db) -> NewDoc + after_doc_read = nil, % nil | fun(Doc, Db) -> NewDoc + + waiting_delayed_commit = nil, + + options = [], + compression +}). + + +-define(NEW_PSE_DB, { + db, + _, % Version + _, % Name + _, % FilePath + _, % Engine + _, % MainPid + _, % CompactorPid + _, % CommittedUpdateSeq + _, % InstanceStartTime + _, % UserCtx + _, % Security + _, % ValidateDocFuns + _, % BeforeDocUpdate + _, % AfterDocRead + _, % WaitingDelayedCommit + _, % Options + _ % Compression +}). + + +-define(PSE_DB_NAME(Db), element(3, Db)). +-define(PSE_DB_MAIN_PID(Db), element(6, Db)). +-define(PSE_DB_USER_CTX(Db), element(10, Db)). +-define(PSE_DB_SECURITY(Db), element(11, Db)). diff --git a/src/couch/src/couch_db_plugin.erl b/src/couch/src/couch_db_plugin.erl index 774e9e094..740b8121b 100644 --- a/src/couch/src/couch_db_plugin.erl +++ b/src/couch/src/couch_db_plugin.erl @@ -32,13 +32,15 @@ validate_dbname(DbName, Normalized, Default) -> maybe_handle(validate_dbname, [DbName, Normalized], Default). -before_doc_update(#db{before_doc_update = Fun} = Db, Doc0) -> +before_doc_update(Db, Doc0) -> + Fun = couch_db:get_before_doc_update_fun(Db), case with_pipe(before_doc_update, [Doc0, Db]) of [Doc1, _Db] when is_function(Fun) -> Fun(Doc1, Db); [Doc1, _Db] -> Doc1 end. -after_doc_read(#db{after_doc_read = Fun} = Db, Doc0) -> +after_doc_read(Db, Doc0) -> + Fun = couch_db:get_after_doc_read_fun(Db), case with_pipe(after_doc_read, [Doc0, Db]) of [Doc1, _Db] when is_function(Fun) -> Fun(Doc1, Db); [Doc1, _Db] -> Doc1 diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl index 78e0b8c19..ca61e04c6 100644 --- a/src/couch/src/couch_db_updater.erl +++ b/src/couch/src/couch_db_updater.erl @@ -20,6 +20,7 @@ -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). -include_lib("couch/include/couch_db.hrl"). +-include("couch_db_int.hrl"). -define(IDLE_LIMIT_DEFAULT, 61000). @@ -1079,14 +1080,13 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) -> {Body, AttInfos} = copy_doc_attachments(Db, Sp, DestFd), % In the future, we should figure out how to do this for % upgrade purposes. - EJsonBody = case is_binary(Body) of + ExternalSize = case is_binary(Body) of true -> - couch_compress:decompress(Body); + couch_compress:uncompressed_size(Body); false -> - Body + ?term_size(Body) end, SummaryChunk = make_doc_summary(NewDb, {Body, AttInfos}), - ExternalSize = ?term_size(EJsonBody), {ok, Pos, SummarySize} = couch_file:append_raw_chunk( DestFd, SummaryChunk), AttSizes = [{element(3,A), element(4,A)} || A <- AttInfos], @@ -1472,7 +1472,7 @@ get_meta_body_size(Meta, Summary) -> {ejson_size, ExternalSize} -> ExternalSize; false -> - ?term_size(couch_compress:decompress(Summary)) + couch_compress:uncompressed_size(Summary) end. diff --git a/src/couch/src/couch_httpd_db.erl b/src/couch/src/couch_httpd_db.erl index a6d83d619..34a1539aa 100644 --- a/src/couch/src/couch_httpd_db.erl +++ b/src/couch/src/couch_httpd_db.erl @@ -70,7 +70,8 @@ handle_changes_req(#httpd{method='GET'}=Req, Db, ChangesArgs, ChangesFun) -> handle_changes_req(#httpd{}=Req, _Db, _ChangesArgs, _ChangesFun) -> couch_httpd:send_method_not_allowed(Req, "GET,HEAD,POST"). -handle_changes_req1(Req, #db{name=DbName}=Db, ChangesArgs, ChangesFun) -> +handle_changes_req1(Req, Db, ChangesArgs, ChangesFun) -> + DbName = couch_db:name(Db), AuthDbName = ?l2b(config:get("couch_httpd_auth", "authentication_db")), case AuthDbName of DbName -> @@ -287,7 +288,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_ensure_full_commit">>]}=Req, Db) - RequiredSeq > CommittedSeq -> couch_db:ensure_full_commit(Db); true -> - {ok, Db#db.instance_start_time} + {ok, couch_db:get_instance_start_time(Db)} end end, send_json(Req, 201, {[ @@ -733,7 +734,8 @@ update_doc_result_to_json(DocId, Error) -> update_doc(Req, Db, DocId, #doc{deleted=false}=Doc) -> - Loc = absolute_uri(Req, "/" ++ couch_util:url_encode(Db#db.name) ++ "/" ++ couch_util:url_encode(DocId)), + DbName = couch_db:name(Db), + Loc = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName) ++ "/" ++ couch_util:url_encode(DocId)), update_doc(Req, Db, DocId, Doc, [{"Location", Loc}]); update_doc(Req, Db, DocId, Doc) -> update_doc(Req, Db, DocId, Doc, []). @@ -1037,7 +1039,7 @@ db_attachment_req(#httpd{method=Method,mochi_req=MochiReq}=Req, Db, DocId, FileN []; _ -> [{"Location", absolute_uri(Req, "/" ++ - couch_util:url_encode(Db#db.name) ++ "/" ++ + couch_util:url_encode(couch_db:name(Db)) ++ "/" ++ couch_util:url_encode(DocId) ++ "/" ++ couch_util:url_encode(FileName) )}] @@ -1149,7 +1151,7 @@ parse_changes_query(Req, Db) -> {"descending", "true"} -> Args#changes_args{dir=rev}; {"since", "now"} -> - UpdateSeq = couch_util:with_db(Db#db.name, fun(WDb) -> + UpdateSeq = couch_util:with_db(couch_db:name(Db), fun(WDb) -> couch_db:get_update_seq(WDb) end), Args#changes_args{since=UpdateSeq}; diff --git a/src/couch/src/couch_lru.erl b/src/couch/src/couch_lru.erl index b58a623d6..023515e7c 100644 --- a/src/couch/src/couch_lru.erl +++ b/src/couch/src/couch_lru.erl @@ -13,7 +13,7 @@ -module(couch_lru). -export([new/0, insert/2, update/2, close/1]). --include_lib("couch/include/couch_db.hrl"). +-include("couch_server_int.hrl"). new() -> {gb_trees:empty(), dict:new()}. @@ -43,16 +43,17 @@ close({Tree, _} = Cache) -> close_int(none, _) -> false; close_int({Lru, DbName, Iter}, {Tree, Dict} = Cache) -> - case ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}) of + case ets:update_element(couch_dbs, DbName, {#entry.lock, locked}) of true -> - [#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName), + [#entry{db = Db, pid = Pid}] = ets:lookup(couch_dbs, DbName), case couch_db:is_idle(Db) of true -> true = ets:delete(couch_dbs, DbName), true = ets:delete(couch_dbs_pid_to_name, Pid), exit(Pid, kill), {true, {gb_trees:delete(Lru, Tree), dict:erase(DbName, Dict)}}; false -> - true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, nil}), + ElemSpec = {#entry.lock, unlocked}, + true = ets:update_element(couch_dbs, DbName, ElemSpec), couch_stats:increment_counter([couchdb, couch_server, lru_skip]), close_int(gb_trees:next(Iter), update(DbName, Cache)) end; diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl index 24016e05c..efcef714e 100644 --- a/src/couch/src/couch_server.erl +++ b/src/couch/src/couch_server.erl @@ -27,6 +27,7 @@ -export([handle_config_change/5, handle_config_terminate/3]). -include_lib("couch/include/couch_db.hrl"). +-include("couch_server_int.hrl"). -define(MAX_DBS_OPEN, 500). -define(RELISTEN_DELAY, 5000). @@ -74,16 +75,18 @@ sup_start_link() -> open(DbName, Options0) -> Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}), case ets:lookup(couch_dbs, DbName) of - [#db{fd=Fd, fd_monitor=Lock, options=Options} = Db] when Lock =/= locked -> - update_lru(DbName, Options), - {ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}}; + [#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked -> + update_lru(DbName, Entry#entry.db_options), + {ok, Db1} = couch_db:incref(Db0), + couch_db:set_user_ctx(Db1, Ctx); _ -> Options = maybe_add_sys_db_callbacks(DbName, Options0), Timeout = couch_util:get_value(timeout, Options, infinity), Create = couch_util:get_value(create_if_missing, Options, false), case gen_server:call(couch_server, {open, DbName, Options}, Timeout) of - {ok, #db{fd=Fd} = Db} -> - {ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}}; + {ok, Db0} -> + {ok, Db1} = couch_db:incref(Db0), + couch_db:set_user_ctx(Db1, Ctx); {not_found, no_db_file} when Create -> couch_log:warning("creating missing database: ~s", [DbName]), couch_server:create(DbName, Options); @@ -104,9 +107,10 @@ close_lru() -> create(DbName, Options0) -> Options = maybe_add_sys_db_callbacks(DbName, Options0), case gen_server:call(couch_server, {create, DbName, Options}, infinity) of - {ok, #db{fd=Fd} = Db} -> + {ok, Db0} -> Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}), - {ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}}; + {ok, Db1} = couch_db:incref(Db0), + couch_db:set_user_ctx(Db1, Ctx); Error -> Error end. @@ -176,9 +180,9 @@ hash_admin_passwords(Persist) -> close_db_if_idle(DbName) -> case ets:lookup(couch_dbs, DbName) of - [#db{}] -> + [#entry{}] -> gen_server:cast(couch_server, {close_db_if_idle, DbName}); - _ -> + [] -> ok end. @@ -197,7 +201,7 @@ init([]) -> ok = config:listen_for_changes(?MODULE, nil), ok = couch_file:init_delete_dir(RootDir), hash_admin_passwords(), - ets:new(couch_dbs, [set, protected, named_table, {keypos, #db.name}]), + ets:new(couch_dbs, [set, protected, named_table, {keypos, #entry.name}]), ets:new(couch_dbs_pid_to_name, [set, protected, named_table]), process_flag(trap_exit, true), {ok, #server{root_dir=RootDir, @@ -209,8 +213,9 @@ terminate(Reason, Srv) -> couch_log:error("couch_server terminating with ~p, state ~2048p", [Reason, Srv#server{lru = redacted}]), - ets:foldl(fun(#db{main_pid=Pid}, _) -> couch_util:shutdown_sync(Pid) end, - nil, couch_dbs), + ets:foldl(fun(#entry{db = Db}, _) -> + couch_util:shutdown_sync(couch_db:get_pid(Db)) + end, nil, couch_dbs), ok. handle_config_change("couchdb", "database_dir", _, _, _) -> @@ -316,15 +321,13 @@ open_async(Server, From, DbName, Filepath, Options) -> true -> create; false -> open end, - % icky hack of field values - compactor_pid used to store clients - % and fd used for opening request info - true = ets:insert(couch_dbs, #db{ + true = ets:insert(couch_dbs, #entry{ name = DbName, - fd = ReqType, - main_pid = Opener, - compactor_pid = [From], - fd_monitor = locked, - options = Options + pid = Opener, + lock = locked, + waiters = [From], + req_type = ReqType, + db_options = Options }), true = ets:insert(couch_dbs_pid_to_name, {Opener, DbName}), db_opened(Server, Options). @@ -348,16 +351,15 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) -> true = ets:delete(couch_dbs_pid_to_name, FromPid), OpenTime = timer:now_diff(os:timestamp(), T0) / 1000, couch_stats:update_histogram([couchdb, db_open_time], OpenTime), - % icky hack of field values - compactor_pid used to store clients - % and fd used to possibly store a creation request + DbPid = couch_db:get_pid(Db), case ets:lookup(couch_dbs, DbName) of [] -> % db was deleted during async open - exit(Db#db.main_pid, kill), + exit(DbPid, kill), {reply, ok, Server}; - [#db{fd=ReqType, compactor_pid=Froms}] -> - link(Db#db.main_pid), - [gen_server:reply(From, {ok, Db}) || From <- Froms], + [#entry{req_type = ReqType, waiters = Waiters} = Entry] -> + link(DbPid), + [gen_server:reply(Waiter, {ok, Db}) || Waiter <- Waiters], % Cancel the creation request if it exists. case ReqType of {create, DbName, _Filepath, _Options, CrFrom} -> @@ -365,8 +367,15 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) -> _ -> ok end, - true = ets:insert(couch_dbs, Db), - true = ets:insert(couch_dbs_pid_to_name, {Db#db.main_pid, DbName}), + true = ets:insert(couch_dbs, #entry{ + name = DbName, + db = Db, + pid = DbPid, + lock = unlocked, + db_options = Entry#entry.db_options, + start_time = couch_db:get_instance_start_time(Db) + }), + true = ets:insert(couch_dbs_pid_to_name, {DbPid, DbName}), Lru = case couch_db:is_system_db(Db) of false -> couch_lru:insert(DbName, Server#server.lru); @@ -378,13 +387,12 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) -> handle_call({open_result, T0, DbName, {error, eexist}}, From, Server) -> handle_call({open_result, T0, DbName, file_exists}, From, Server); handle_call({open_result, _T0, DbName, Error}, {FromPid, _Tag}, Server) -> - % icky hack of field values - compactor_pid used to store clients case ets:lookup(couch_dbs, DbName) of [] -> % db was deleted during async open {reply, ok, Server}; - [#db{fd=ReqType, compactor_pid=Froms}=Db] -> - [gen_server:reply(From, Error) || From <- Froms], + [#entry{req_type = ReqType, waiters = Waiters} = Entry] -> + [gen_server:reply(Waiter, Error) || Waiter <- Waiters], couch_log:info("open_result error ~p for ~s", [Error, DbName]), true = ets:delete(couch_dbs, DbName), true = ets:delete(couch_dbs_pid_to_name, FromPid), @@ -394,7 +402,7 @@ handle_call({open_result, _T0, DbName, Error}, {FromPid, _Tag}, Server) -> _ -> Server end, - {reply, ok, db_closed(NewServer, Db#db.options)} + {reply, ok, db_closed(NewServer, Entry#entry.db_options)} end; handle_call({open, DbName, Options}, From, Server) -> case ets:lookup(couch_dbs, DbName) of @@ -412,15 +420,14 @@ handle_call({open, DbName, Options}, From, Server) -> Error -> {reply, Error, Server} end; - [#db{compactor_pid = Froms} = Db] when is_list(Froms) -> - % icky hack of field values - compactor_pid used to store clients - true = ets:insert(couch_dbs, Db#db{compactor_pid = [From|Froms]}), - if length(Froms) =< 10 -> ok; true -> + [#entry{waiters = Waiters} = Entry] when is_list(Waiters) -> + true = ets:insert(couch_dbs, Entry#entry{waiters = [From | Waiters]}), + if length(Waiters) =< 10 -> ok; true -> Fmt = "~b clients waiting to open db ~s", - couch_log:info(Fmt, [length(Froms), DbName]) + couch_log:info(Fmt, [length(Waiters), DbName]) end, {noreply, Server}; - [#db{} = Db] -> + [#entry{db = Db}] -> {reply, {ok, Db}, Server} end; handle_call({create, DbName, Options}, From, Server) -> @@ -437,14 +444,13 @@ handle_call({create, DbName, Options}, From, Server) -> CloseError -> {reply, CloseError, Server} end; - [#db{fd=open}=Db] -> + [#entry{req_type = open} = Entry] -> % We're trying to create a database while someone is in % the middle of trying to open it. We allow one creator % to wait while we figure out if it'll succeed. - % icky hack of field values - fd used to store create request CrOptions = [create | Options], - NewDb = Db#db{fd={create, DbName, Filepath, CrOptions, From}}, - true = ets:insert(couch_dbs, NewDb), + Req = {create, DbName, Filepath, CrOptions, From}, + true = ets:insert(couch_dbs, Entry#entry{req_type = Req}), {noreply, Server}; [_AlreadyRunningDb] -> {reply, file_exists, Server} @@ -460,18 +466,17 @@ handle_call({delete, DbName, Options}, _From, Server) -> Server2 = case ets:lookup(couch_dbs, DbName) of [] -> Server; - [#db{main_pid=Pid, compactor_pid=Froms} = Db] when is_list(Froms) -> - % icky hack of field values - compactor_pid used to store clients + [#entry{pid = Pid, waiters = Waiters} = Entry] when is_list(Waiters) -> true = ets:delete(couch_dbs, DbName), true = ets:delete(couch_dbs_pid_to_name, Pid), exit(Pid, kill), - [gen_server:reply(F, not_found) || F <- Froms], - db_closed(Server, Db#db.options); - [#db{main_pid=Pid} = Db] -> + [gen_server:reply(Waiter, not_found) || Waiter <- Waiters], + db_closed(Server, Entry#entry.db_options); + [#entry{pid = Pid} = Entry] -> true = ets:delete(couch_dbs, DbName), true = ets:delete(couch_dbs_pid_to_name, Pid), exit(Pid, kill), - db_closed(Server, Db#db.options) + db_closed(Server, Entry#entry.db_options) end, %% Delete any leftover compaction files. If we don't do this a @@ -497,11 +502,12 @@ handle_call({delete, DbName, Options}, _From, Server) -> Error -> {reply, Error, Server} end; -handle_call({db_updated, #db{}=Db}, _From, Server0) -> - #db{name = DbName, instance_start_time = StartTime} = Db, - Server = try ets:lookup_element(couch_dbs, DbName, #db.instance_start_time) of +handle_call({db_updated, Db}, _From, Server0) -> + DbName = couch_db:name(Db), + StartTime = couch_db:get_instance_start_time(Db), + Server = try ets:lookup_element(couch_dbs, DbName, #entry.start_time) of StartTime -> - true = ets:insert(couch_dbs, Db), + true = ets:update_element(couch_dbs, DbName, {#entry.db, Db}), Lru = case couch_db:is_system_db(Db) of false -> couch_lru:update(DbName, Server0#server.lru); true -> Server0#server.lru @@ -519,17 +525,19 @@ handle_cast({update_lru, DbName}, #server{lru = Lru, update_lru_on_read=true} = handle_cast({update_lru, _DbName}, Server) -> {noreply, Server}; handle_cast({close_db_if_idle, DbName}, Server) -> - case ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}) of + case ets:update_element(couch_dbs, DbName, {#entry.lock, locked}) of true -> - [#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName), + [#entry{db = Db, db_options = DbOpts}] = ets:lookup(couch_dbs, DbName), case couch_db:is_idle(Db) of true -> + DbPid = couch_db:get_pid(Db), true = ets:delete(couch_dbs, DbName), - true = ets:delete(couch_dbs_pid_to_name, Pid), - exit(Pid, kill), - {noreply, db_closed(Server, Db#db.options)}; + true = ets:delete(couch_dbs_pid_to_name, DbPid), + exit(DbPid, kill), + {noreply, db_closed(Server, DbOpts)}; false -> - true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, nil}), + true = ets:update_element( + couch_dbs, DbName, {#entry.lock, unlocked}), {noreply, Server} end; false -> @@ -547,22 +555,19 @@ handle_info({'EXIT', _Pid, config_change}, Server) -> handle_info({'EXIT', Pid, Reason}, Server) -> case ets:lookup(couch_dbs_pid_to_name, Pid) of [{Pid, DbName}] -> - [#db{compactor_pid=Froms}=Db] = ets:lookup(couch_dbs, DbName), + [#entry{waiters = Waiters} = Entry] = ets:lookup(couch_dbs, DbName), if Reason /= snappy_nif_not_loaded -> ok; true -> Msg = io_lib:format("To open the database `~s`, Apache CouchDB " "must be built with Erlang OTP R13B04 or higher.", [DbName]), couch_log:error(Msg, []) end, couch_log:info("db ~s died with reason ~p", [DbName, Reason]), - % icky hack of field values - compactor_pid used to store clients - if is_list(Froms) -> - [gen_server:reply(From, Reason) || From <- Froms]; - true -> - ok + if not is_list(Waiters) -> ok; true -> + [gen_server:reply(Waiter, Reason) || Waiter <- Waiters] end, true = ets:delete(couch_dbs, DbName), true = ets:delete(couch_dbs_pid_to_name, Pid), - {noreply, db_closed(Server, Db#db.options)}; + {noreply, db_closed(Server, Entry#entry.db_options)}; [] -> {noreply, Server} end; diff --git a/src/couch/src/couch_server_int.hrl b/src/couch/src/couch_server_int.hrl new file mode 100644 index 000000000..537a6abb9 --- /dev/null +++ b/src/couch/src/couch_server_int.hrl @@ -0,0 +1,23 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +-record(entry, { + name, + db, + pid, + lock, + waiters, + req_type, + db_options, + start_time +}). diff --git a/src/couch/src/couch_users_db.erl b/src/couch/src/couch_users_db.erl index 6f7b9af73..c7b41f1fc 100644 --- a/src/couch/src/couch_users_db.erl +++ b/src/couch/src/couch_users_db.erl @@ -39,8 +39,8 @@ % -> 404 // Not Found % Else % -> save_doc -before_doc_update(Doc, #db{user_ctx = UserCtx} = Db) -> - #user_ctx{name=Name} = UserCtx, +before_doc_update(Doc, Db) -> + #user_ctx{name=Name} = couch_db:get_user_ctx(Db), DocName = get_doc_name(Doc), case (catch couch_db:check_is_admin(Db)) of ok -> @@ -108,8 +108,8 @@ after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, Db) -> throw({forbidden, <<"Only administrators can view design docs in the users database.">>}) end; -after_doc_read(Doc, #db{user_ctx = UserCtx} = Db) -> - #user_ctx{name=Name} = UserCtx, +after_doc_read(Doc, Db) -> + #user_ctx{name=Name} = couch_db:get_user_ctx(Db), DocName = get_doc_name(Doc), case (catch couch_db:check_is_admin(Db)) of ok -> diff --git a/src/couch/src/couch_util.erl b/src/couch/src/couch_util.erl index 4b848616d..42d10ec1e 100644 --- a/src/couch/src/couch_util.erl +++ b/src/couch/src/couch_util.erl @@ -199,7 +199,9 @@ json_apply_field({Key, NewValue}, [{OtherKey, OtherVal} | Headers], Acc) -> json_apply_field({Key, NewValue}, [], Acc) -> {[{Key, NewValue}|Acc]}. -json_user_ctx(#db{name=ShardName, user_ctx=Ctx}) -> +json_user_ctx(Db) -> + ShardName = couch_db:name(Db), + Ctx = couch_db:get_user_ctx(Db), {[{<<"db">>, mem3:dbname(ShardName)}, {<<"name">>,Ctx#user_ctx.name}, {<<"roles">>,Ctx#user_ctx.roles}]}. @@ -456,9 +458,7 @@ encode_doc_id(Id) -> url_encode(Id). -with_db(Db, Fun) when is_record(Db, db) -> - Fun(Db); -with_db(DbName, Fun) -> +with_db(DbName, Fun) when is_binary(DbName) -> case couch_db:open_int(DbName, [?ADMIN_CTX]) of {ok, Db} -> try @@ -468,6 +468,13 @@ with_db(DbName, Fun) -> end; Else -> throw(Else) + end; +with_db(Db, Fun) -> + case couch_db:is_db(Db) of + true -> + Fun(Db); + false -> + erlang:error({invalid_db, Db}) end. rfc1123_date() -> diff --git a/src/couch/src/test_util.erl b/src/couch/src/test_util.erl index e652dd9b3..8a05e8830 100644 --- a/src/couch/src/test_util.erl +++ b/src/couch/src/test_util.erl @@ -13,6 +13,8 @@ -module(test_util). -include_lib("couch/include/couch_eunit.hrl"). +-include("couch_db.hrl"). +-include("couch_db_int.hrl"). -export([init_code_path/0]). -export([source_file/1, build_file/1]). @@ -32,6 +34,8 @@ -export([start/1, start/2, start/3, stop/1]). +-export([fake_db/1]). + -record(test_context, {mocked = [], started = [], module}). -define(DEFAULT_APPS, @@ -230,6 +234,16 @@ stop(#test_context{mocked = Mocked, started = Apps}) -> meck:unload(Mocked), stop_applications(Apps). +fake_db(Fields) -> + Indexes = lists:zip( + record_info(fields, db), + lists:seq(2, record_info(size, db)) + ), + lists:foldl(fun({FieldName, Value}, Acc) -> + Idx = couch_util:get_value(FieldName, Indexes), + setelement(Idx, Acc, Value) + end, #db{}, Fields). + now_us() -> {MegaSecs, Secs, MicroSecs} = now(), (MegaSecs * 1000000 + Secs) * 1000000 + MicroSecs. diff --git a/src/couch/test/couch_auth_cache_tests.erl b/src/couch/test/couch_auth_cache_tests.erl index 6328c9b97..6916045c3 100644 --- a/src/couch/test/couch_auth_cache_tests.erl +++ b/src/couch/test/couch_auth_cache_tests.erl @@ -276,7 +276,7 @@ hash_password(Password) -> shutdown_db(DbName) -> {ok, AuthDb} = couch_db:open_int(DbName, [?ADMIN_CTX]), ok = couch_db:close(AuthDb), - couch_util:shutdown_sync(AuthDb#db.main_pid), + couch_util:shutdown_sync(couch_db:get_pid(AuthDb)), ok = timer:sleep(1000). get_doc_rev(DbName, UserName) -> diff --git a/src/couch/test/couch_changes_tests.erl b/src/couch/test/couch_changes_tests.erl index b2da3fea4..673f2faad 100644 --- a/src/couch/test/couch_changes_tests.erl +++ b/src/couch/test/couch_changes_tests.erl @@ -652,7 +652,7 @@ should_filter_by_user_ctx({DbName, _}) -> ]}), ChArgs = #changes_args{filter = "app/valid"}, UserCtx = #user_ctx{name = <<"doc3">>, roles = []}, - DbRec = #db{name = DbName, user_ctx = UserCtx}, + {ok, DbRec} = couch_db:clustered_db(DbName, UserCtx), Req = {json_req, {[{ <<"userCtx">>, couch_util:json_user_ctx(DbRec) }]}}, diff --git a/src/couch/test/couch_compress_tests.erl b/src/couch/test/couch_compress_tests.erl index 6d6e6a792..addb9a0e2 100644 --- a/src/couch/test/couch_compress_tests.erl +++ b/src/couch/test/couch_compress_tests.erl @@ -72,3 +72,14 @@ is_compressed_test_() -> ?_assertError(invalid_compression, couch_compress:is_compressed(?CORRUPT, snappy)) ]. + +uncompressed_size_test_() -> + [ + ?_assertEqual(49, couch_compress:uncompressed_size(?NONE)), + ?_assertEqual(49, couch_compress:uncompressed_size(?DEFLATE)), + ?_assertEqual(49, couch_compress:uncompressed_size(?SNAPPY)), + ?_assertEqual(5, couch_compress:uncompressed_size( + couch_compress:compress(x, {deflate, 9}))), + ?_assertError(invalid_compression, + couch_compress:uncompressed_size(?CORRUPT)) + ]. diff --git a/src/couch/test/couch_db_plugin_tests.erl b/src/couch/test/couch_db_plugin_tests.erl index ea9b230b1..94dd3dfa5 100644 --- a/src/couch/test/couch_db_plugin_tests.erl +++ b/src/couch/test/couch_db_plugin_tests.erl @@ -43,6 +43,7 @@ data_providers() -> []. data_subscriptions() -> []. processes() -> []. notify(_, _, _) -> ok. +fake_db() -> element(2, couch_db:clustered_db(fake, totes_fake)). setup() -> couch_tests:setup([ @@ -133,33 +134,33 @@ validate_dbname_pass() -> before_doc_update_match() -> ?assertMatch( {true, [before_doc_update, doc]}, - couch_db_plugin:before_doc_update(#db{}, {true, [doc]})). + couch_db_plugin:before_doc_update(fake_db(), {true, [doc]})). before_doc_update_no_match() -> ?assertMatch( {false, [doc]}, - couch_db_plugin:before_doc_update(#db{}, {false, [doc]})). + couch_db_plugin:before_doc_update(fake_db(), {false, [doc]})). before_doc_update_throw() -> ?assertThrow( before_doc_update, - couch_db_plugin:before_doc_update(#db{}, {fail, [doc]})). + couch_db_plugin:before_doc_update(fake_db(), {fail, [doc]})). after_doc_read_match() -> ?assertMatch( {true, [after_doc_read, doc]}, - couch_db_plugin:after_doc_read(#db{}, {true, [doc]})). + couch_db_plugin:after_doc_read(fake_db(), {true, [doc]})). after_doc_read_no_match() -> ?assertMatch( {false, [doc]}, - couch_db_plugin:after_doc_read(#db{}, {false, [doc]})). + couch_db_plugin:after_doc_read(fake_db(), {false, [doc]})). after_doc_read_throw() -> ?assertThrow( after_doc_read, - couch_db_plugin:after_doc_read(#db{}, {fail, [doc]})). + couch_db_plugin:after_doc_read(fake_db(), {fail, [doc]})). validate_docid_match() -> diff --git a/src/couch/test/couch_server_tests.erl b/src/couch/test/couch_server_tests.erl index c8f8381d7..c52b3f6b0 100644 --- a/src/couch/test/couch_server_tests.erl +++ b/src/couch/test/couch_server_tests.erl @@ -32,8 +32,9 @@ setup(_) -> setup(). teardown(Db) -> + FilePath = couch_db:get_filepath(Db), (catch couch_db:close(Db)), - (catch file:delete(Db#db.filepath)). + (catch file:delete(FilePath)). teardown(rename, Db) -> config:set("couchdb", "enable_database_recovery", "false", false), @@ -61,7 +62,9 @@ make_test_case(Mod, Funs) -> {foreachx, fun setup/1, fun teardown/2, [{Mod, Fun} || Fun <- Funs]} }. -should_rename_on_delete(_, #db{filepath = Origin, name = DbName}) -> +should_rename_on_delete(_, Db) -> + DbName = couch_db:name(Db), + Origin = couch_db:get_filepath(Db), ?_test(begin ?assert(filelib:is_regular(Origin)), ?assertMatch(ok, couch_server:delete(DbName, [])), @@ -74,7 +77,9 @@ should_rename_on_delete(_, #db{filepath = Origin, name = DbName}) -> ?assert(filelib:is_regular(Renamed)) end). -should_delete(_, #db{filepath = Origin, name = DbName}) -> +should_delete(_, Db) -> + DbName = couch_db:name(Db), + Origin = couch_db:get_filepath(Db), ?_test(begin ?assert(filelib:is_regular(Origin)), ?assertMatch(ok, couch_server:delete(DbName, [])), diff --git a/src/couch/test/couchdb_compaction_daemon_tests.erl b/src/couch/test/couchdb_compaction_daemon_tests.erl index 856a53d05..0d7a46862 100644 --- a/src/couch/test/couchdb_compaction_daemon_tests.erl +++ b/src/couch/test/couchdb_compaction_daemon_tests.erl @@ -175,7 +175,7 @@ update(DbName) -> lists:foreach(fun(_) -> Doc = couch_doc:from_json_obj({[{<<"_id">>, couch_uuids:new()}]}), {ok, _} = couch_db:update_docs(Db, [Doc]), - query_view(Db#db.name) + query_view(couch_db:name(Db)) end, lists:seq(1, 200)), couch_db:close(Db). @@ -213,7 +213,7 @@ spawn_compaction_monitor(DbName) -> {Pid, Ref} = spawn_monitor(fun() -> DaemonPid = whereis(couch_compaction_daemon), DbPid = couch_util:with_db(DbName, fun(Db) -> - Db#db.main_pid + couch_db:get_pid(Db) end), {ok, ViewPid} = couch_index_server:get_index(couch_mrview_index, DbName, <<"_design/foo">>), diff --git a/src/couch/test/couchdb_file_compression_tests.erl b/src/couch/test/couchdb_file_compression_tests.erl index 09fead582..8f0fe5bf1 100644 --- a/src/couch/test/couchdb_file_compression_tests.erl +++ b/src/couch/test/couchdb_file_compression_tests.erl @@ -157,7 +157,7 @@ compare_compression_methods(DbName) -> ?assert(DbSizeDeflate1 > DbSizeDeflate9), ?assert(ViewSizeDeflate1 > ViewSizeDeflate9), - ?assert(ExternalSizePreCompact =:= ExternalSizeNone), + ?assert(ExternalSizePreCompact >= ExternalSizeNone), ?assert(ExternalSizeNone =:= ExternalSizeSnappy), ?assert(ExternalSizeNone =:= ExternalSizeDeflate9), ?assert(ViewExternalSizeNone =:= ViewExternalSizeSnappy), diff --git a/src/couch/test/couchdb_views_tests.erl b/src/couch/test/couchdb_views_tests.erl index ae4029513..c0505f3db 100644 --- a/src/couch/test/couchdb_views_tests.erl +++ b/src/couch/test/couchdb_views_tests.erl @@ -348,11 +348,11 @@ couchdb_1283() -> ]}), {ok, _} = couch_db:update_doc(MDb1, DDoc, []), ok = populate_db(MDb1, 100, 100), - query_view(MDb1#db.name, "foo", "foo"), + query_view(couch_db:name(MDb1), "foo", "foo"), ok = couch_db:close(MDb1), {ok, Pid} = couch_index_server:get_index( - couch_mrview_index, MDb1#db.name, <<"_design/foo">>), + couch_mrview_index, couch_db:name(MDb1), <<"_design/foo">>), % Start and pause compacton WaitRef = erlang:make_ref(), @@ -522,7 +522,8 @@ view_cleanup(DbName) -> count_users(DbName) -> {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), - {monitored_by, Monitors} = erlang:process_info(Db#db.main_pid, monitored_by), + DbPid = couch_db:get_pid(Db), + {monitored_by, Monitors} = process_info(DbPid, monitored_by), CouchFiles = [P || P <- Monitors, couch_file:process_info(P) =/= undefined], ok = couch_db:close(Db), length(lists:usort(Monitors) -- [self() | CouchFiles]). @@ -546,9 +547,10 @@ backup_db_file(DbName) -> restore_backup_db_file(DbName) -> DbDir = config:get("couchdb", "database_dir"), - {ok, #db{main_pid = UpdaterPid} = Db} = couch_db:open_int(DbName, []), + {ok, Db} = couch_db:open_int(DbName, []), ok = couch_db:close(Db), - exit(UpdaterPid, shutdown), + DbPid = couch_db:get_pid(Db), + exit(DbPid, shutdown), DbFile = filename:join([DbDir, ?b2l(DbName) ++ ".couch"]), ok = file:delete(DbFile), @@ -556,9 +558,13 @@ restore_backup_db_file(DbName) -> test_util:wait(fun() -> case couch_server:open(DbName, [{timeout, ?TIMEOUT}]) of - {ok, #db{main_pid = UpdaterPid}} -> wait; - {ok, _} -> ok; - Else -> Else + {ok, WaitDb} -> + case couch_db:get_pid(WaitDb) == DbPid of + true -> wait; + false -> ok + end; + Else -> + Else end end, ?TIMEOUT, ?DELAY). @@ -576,7 +582,8 @@ wait_db_compact_done(_DbName, 0) -> wait_db_compact_done(DbName, N) -> {ok, Db} = couch_db:open_int(DbName, []), ok = couch_db:close(Db), - case is_pid(Db#db.compactor_pid) of + CompactorPid = couch_db:get_compactor_pid(Db), + case is_pid(CompactorPid) of false -> ok; true -> diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl index 8225a90a3..a33c1e490 100644 --- a/src/couch_index/src/couch_index_server.erl +++ b/src/couch_index/src/couch_index_server.erl @@ -60,11 +60,9 @@ validate(DbName, DDoc) -> lists:foreach(ValidateFun, EnabledIndexers). -get_index(Module, #db{name = <<"shards/", _/binary>> = DbName}, DDoc) -> - case is_record(DDoc, doc) of - true -> get_index(Module, DbName, DDoc, nil); - false -> get_index(Module, DbName, DDoc) - end; +get_index(Module, <<"shards/", _/binary>> = DbName, DDoc) + when is_record(DDoc, doc) -> + get_index(Module, DbName, DDoc, nil); get_index(Module, <<"shards/", _/binary>> = DbName, DDoc) -> {Pid, Ref} = spawn_monitor(fun() -> exit(fabric:open_doc(mem3:dbname(DbName), DDoc, [ejson_body, ?ADMIN_CTX])) @@ -77,9 +75,10 @@ get_index(Module, <<"shards/", _/binary>> = DbName, DDoc) -> erlang:demonitor(Ref, [flush]), {error, timeout} end; - -get_index(Module, DbName, DDoc) -> - get_index(Module, DbName, DDoc, nil). +get_index(Module, DbName, DDoc) when is_binary(DbName) -> + get_index(Module, DbName, DDoc, nil); +get_index(Module, Db, DDoc) -> + get_index(Module, couch_db:name(Db), DDoc). get_index(Module, DbName, DDoc, Fun) when is_binary(DbName) -> diff --git a/src/couch_index/src/couch_index_util.erl b/src/couch_index/src/couch_index_util.erl index 5694641ca..dcb33b5b0 100644 --- a/src/couch_index/src/couch_index_util.erl +++ b/src/couch_index/src/couch_index_util.erl @@ -25,7 +25,7 @@ root_dir() -> index_dir(Module, DbName) when is_binary(DbName) -> DbDir = "." ++ binary_to_list(DbName) ++ "_design", filename:join([root_dir(), DbDir, Module]); -index_dir(Module, #db{}=Db) -> +index_dir(Module, Db) -> index_dir(Module, couch_db:name(Db)). diff --git a/src/couch_index/test/couch_index_compaction_tests.erl b/src/couch_index/test/couch_index_compaction_tests.erl index 0048b338e..062be872a 100644 --- a/src/couch_index/test/couch_index_compaction_tests.erl +++ b/src/couch_index/test/couch_index_compaction_tests.erl @@ -25,7 +25,8 @@ setup() -> ?assertNot(is_opened(Db)), {Db, IndexerPid}. -fake_index(#db{name = DbName} = Db) -> +fake_index(Db) -> + DbName = couch_db:name(Db), ok = meck:new([test_index], [non_strict]), ok = meck:expect(test_index, init, ['_', '_'], {ok, 10}), ok = meck:expect(test_index, open, fun(_Db, State) -> diff --git a/src/couch_index/test/couch_index_ddoc_updated_tests.erl b/src/couch_index/test/couch_index_ddoc_updated_tests.erl index f42c9a29a..d1bbc43d2 100644 --- a/src/couch_index/test/couch_index_ddoc_updated_tests.erl +++ b/src/couch_index/test/couch_index_ddoc_updated_tests.erl @@ -91,7 +91,7 @@ check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) -> % assert that all index processes exit after ddoc updated ok = meck:reset(test_index), couch_index_server:handle_db_event( - DbShard#db.name, {ddoc_updated, DDocID}, {st, ""}), + couch_db:name(DbShard), {ddoc_updated, DDocID}, {st, ""}), ok = meck:wait(N, test_index, init, ['_', '_'], 5000), IndexesAfter = get_indexes_by_ddoc(DDocID, 0), diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl index 11c209b43..07e36687d 100644 --- a/src/couch_mrview/src/couch_mrview.erl +++ b/src/couch_mrview/src/couch_mrview.erl @@ -364,15 +364,12 @@ get_view_info(Db, DDoc, VName) -> %% @doc refresh a view index -refresh(#db{name=DbName}, DDoc) -> - refresh(DbName, DDoc); - -refresh(Db, DDoc) -> - UpdateSeq = couch_util:with_db(Db, fun(WDb) -> +refresh(DbName, DDoc) when is_binary(DbName)-> + UpdateSeq = couch_util:with_db(DbName, fun(WDb) -> couch_db:get_update_seq(WDb) end), - case couch_index_server:get_index(couch_mrview_index, Db, DDoc) of + case couch_index_server:get_index(couch_mrview_index, DbName, DDoc) of {ok, Pid} -> case catch couch_index:get_state(Pid, UpdateSeq) of {ok, _} -> ok; @@ -380,7 +377,10 @@ refresh(Db, DDoc) -> end; Error -> {error, Error} - end. + end; + +refresh(Db, DDoc) -> + refresh(couch_db:name(Db), DDoc). compact(Db, DDoc) -> compact(Db, DDoc, []). @@ -668,7 +668,7 @@ get_reduce_fun(#mrargs{extra = Extra}) -> end. -get_total_rows(#db{} = Db, #mrargs{extra = Extra}) -> +get_total_rows(Db, #mrargs{extra = Extra}) -> case couch_util:get_value(namespace, Extra) of <<"_local">> -> null; @@ -678,7 +678,7 @@ get_total_rows(#db{} = Db, #mrargs{extra = Extra}) -> end. -get_update_seq(#db{} = Db, #mrargs{extra = Extra}) -> +get_update_seq(Db, #mrargs{extra = Extra}) -> case couch_util:get_value(namespace, Extra) of <<"_local">> -> null; diff --git a/src/couch_mrview/src/couch_mrview_compactor.erl b/src/couch_mrview/src/couch_mrview_compactor.erl index c1b2fbc21..e9be89c71 100644 --- a/src/couch_mrview/src/couch_mrview_compactor.erl +++ b/src/couch_mrview/src/couch_mrview_compactor.erl @@ -53,8 +53,7 @@ compact(State) -> {ok, Fd} = couch_mrview_util:open_file(CompactFName), ESt = couch_mrview_util:reset_index(Db, Fd, State), - {ok, DbReduce} = couch_btree:full_reduce(Db#db.id_tree), - Count = element(1, DbReduce), + {ok, Count} = couch_db:get_doc_count(Db), {ESt, Count} end), diff --git a/src/couch_mrview/src/couch_mrview_http.erl b/src/couch_mrview/src/couch_mrview_http.erl index a94f48df9..9ad50eeef 100644 --- a/src/couch_mrview/src/couch_mrview_http.erl +++ b/src/couch_mrview/src/couch_mrview_http.erl @@ -103,11 +103,11 @@ handle_view_changes_req(#httpd{path_parts=[_,<<"_design">>,DDocName,<<"_view_cha handle_view_req(#httpd{method='GET', path_parts=[_, _, DDocName, _, VName, <<"_info">>]}=Req, Db, _DDoc) -> - + DbName = couch_db:name(Db), DDocId = <<"_design/", DDocName/binary >>, - {ok, Info} = couch_mrview:get_view_info(Db#db.name, DDocId, VName), + {ok, Info} = couch_mrview:get_view_info(DbName, DDocId, VName), - FinalInfo = [{db_name, Db#db.name}, + FinalInfo = [{db_name, DbName}, {ddoc, DDocId}, {view, VName}] ++ Info, chttpd:send_json(Req, 200, {FinalInfo}); @@ -212,7 +212,7 @@ is_restricted(Db, _) -> couch_db:is_system_db(Db). is_public_fields_configured(Db) -> - DbName = ?b2l(Db#db.name), + DbName = ?b2l(couch_db:name(Db)), case config:get("couch_httpd_auth", "authentication_db", "_users") of DbName -> UsersDbPublic = config:get("couch_httpd_auth", "users_db_public", "false"), @@ -237,7 +237,7 @@ do_all_docs_req(Req, Db, Keys, NS) -> {ok, Resp} = couch_httpd:etag_maybe(Req, fun() -> Max = chttpd:chunked_response_buffer_size(), VAcc0 = #vacc{db=Db, req=Req, threshold=Max}, - DbName = ?b2l(Db#db.name), + DbName = ?b2l(couch_db:name(Db)), UsersDbName = config:get("couch_httpd_auth", "authentication_db", "_users"), diff --git a/src/couch_mrview/src/couch_mrview_show.erl b/src/couch_mrview/src/couch_mrview_show.erl index 3a602ad21..2411c2ca2 100644 --- a/src/couch_mrview/src/couch_mrview_show.erl +++ b/src/couch_mrview/src/couch_mrview_show.erl @@ -364,13 +364,17 @@ json_apply_field({Key, NewValue}, [], Acc) -> % This loads the db info if we have a fully loaded db record, but we might not % have the db locally on this node, so then load the info through fabric. -json_req_obj(Req, #db{main_pid=Pid}=Db) when is_pid(Pid) -> - chttpd_external:json_req_obj(Req, Db); json_req_obj(Req, Db) -> - % use a separate process because we're already in a receive loop, and - % json_req_obj calls fabric:get_db_info() - spawn_monitor(fun() -> exit(chttpd_external:json_req_obj(Req, Db)) end), - receive {'DOWN', _, _, _, JsonReq} -> JsonReq end. + case couch_db:is_clustered(Db) of + true -> + % use a separate process because we're already in a receive loop, + % and json_req_obj calls fabric:get_db_info() + JRO = fun() -> exit(chttpd_external:json_req_obj(Req, Db)) end, + spawn_monitor(JRO), + receive {'DOWN', _, _, _, JsonReq} -> JsonReq end; + false -> + chttpd_external:json_req_obj(Req, Db) + end. last_chunk(Req, undefined) -> chttpd:send_response(Req, 200, [], <<"">>); diff --git a/src/couch_mrview/test/couch_mrview_all_docs_tests.erl b/src/couch_mrview/test/couch_mrview_all_docs_tests.erl index 5e352797f..bf8eb7e5b 100644 --- a/src/couch_mrview/test/couch_mrview_all_docs_tests.erl +++ b/src/couch_mrview/test/couch_mrview_all_docs_tests.erl @@ -25,7 +25,7 @@ setup() -> teardown(Db) -> couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. diff --git a/src/couch_mrview/test/couch_mrview_changes_since_tests.erl b/src/couch_mrview/test/couch_mrview_changes_since_tests.erl index 1e31b3968..d670e109b 100644 --- a/src/couch_mrview/test/couch_mrview_changes_since_tests.erl +++ b/src/couch_mrview/test/couch_mrview_changes_since_tests.erl @@ -19,7 +19,7 @@ teardown(Db) -> couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. changes_since_basic_test_() -> diff --git a/src/couch_mrview/test/couch_mrview_collation_tests.erl b/src/couch_mrview/test/couch_mrview_collation_tests.erl index c4a714d1e..5c8cb54b1 100644 --- a/src/couch_mrview/test/couch_mrview_collation_tests.erl +++ b/src/couch_mrview/test/couch_mrview_collation_tests.erl @@ -64,7 +64,7 @@ setup() -> teardown(Db) -> couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. diff --git a/src/couch_mrview/test/couch_mrview_compact_tests.erl b/src/couch_mrview/test/couch_mrview_compact_tests.erl index 40877c80e..7664becdc 100644 --- a/src/couch_mrview/test/couch_mrview_compact_tests.erl +++ b/src/couch_mrview/test/couch_mrview_compact_tests.erl @@ -26,7 +26,7 @@ setup() -> teardown(Db) -> meck:unload(), couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. diff --git a/src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl b/src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl index cc3844dbd..4310157eb 100644 --- a/src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl +++ b/src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl @@ -60,7 +60,7 @@ setup() -> teardown(Db) -> meck:unload(couch_index_updater), couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. diff --git a/src/couch_mrview/test/couch_mrview_ddoc_validation_tests.erl b/src/couch_mrview/test/couch_mrview_ddoc_validation_tests.erl index c2038ddfb..ce2be8904 100644 --- a/src/couch_mrview/test/couch_mrview_ddoc_validation_tests.erl +++ b/src/couch_mrview/test/couch_mrview_ddoc_validation_tests.erl @@ -23,7 +23,7 @@ setup() -> teardown(Db) -> couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. ddoc_validation_test_() -> diff --git a/src/couch_mrview/test/couch_mrview_index_changes_tests.erl b/src/couch_mrview/test/couch_mrview_index_changes_tests.erl index 8f0c296aa..2701e0c22 100644 --- a/src/couch_mrview/test/couch_mrview_index_changes_tests.erl +++ b/src/couch_mrview/test/couch_mrview_index_changes_tests.erl @@ -22,7 +22,7 @@ setup() -> teardown(Db) -> couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. changes_index_test() -> diff --git a/src/couch_mrview/test/couch_mrview_index_info_tests.erl b/src/couch_mrview/test/couch_mrview_index_info_tests.erl index 3f88972ea..c994df9d3 100644 --- a/src/couch_mrview/test/couch_mrview_index_info_tests.erl +++ b/src/couch_mrview/test/couch_mrview_index_info_tests.erl @@ -28,7 +28,7 @@ setup() -> teardown({Db, _}) -> couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. diff --git a/src/couch_mrview/test/couch_mrview_local_docs_tests.erl b/src/couch_mrview/test/couch_mrview_local_docs_tests.erl index f18f66e4e..c96b98875 100644 --- a/src/couch_mrview/test/couch_mrview_local_docs_tests.erl +++ b/src/couch_mrview/test/couch_mrview_local_docs_tests.erl @@ -25,7 +25,7 @@ setup() -> teardown(Db) -> couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. diff --git a/src/couch_mrview/test/couch_mrview_map_views_tests.erl b/src/couch_mrview/test/couch_mrview_map_views_tests.erl index 3a199288d..229af183d 100644 --- a/src/couch_mrview/test/couch_mrview_map_views_tests.erl +++ b/src/couch_mrview/test/couch_mrview_map_views_tests.erl @@ -24,7 +24,7 @@ setup() -> teardown(Db) -> couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. diff --git a/src/couch_mrview/test/couch_mrview_red_views_tests.erl b/src/couch_mrview/test/couch_mrview_red_views_tests.erl index 310078597..b83686113 100644 --- a/src/couch_mrview/test/couch_mrview_red_views_tests.erl +++ b/src/couch_mrview/test/couch_mrview_red_views_tests.erl @@ -24,7 +24,7 @@ setup() -> teardown(Db) -> couch_db:close(Db), - couch_server:delete(Db#db.name, [?ADMIN_CTX]), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), ok. diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl index a2ef60fa3..ab8eb7f29 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl @@ -60,11 +60,11 @@ db_uri(#httpdb{url = Url}) -> couch_util:url_strip_password(Url); -db_uri(#db{name = Name}) -> - db_uri(Name); +db_uri(DbName) when is_binary(DbName) -> + ?b2l(DbName); -db_uri(DbName) -> - ?b2l(DbName). +db_uri(Db) -> + db_uri(couch_db:name(Db)). db_open(Db, Options) -> @@ -153,10 +153,12 @@ get_db_info(#httpdb{} = Db) -> fun(200, _, {Props}) -> {ok, Props} end); -get_db_info(#db{name = DbName, user_ctx = UserCtx}) -> - {ok, Db} = couch_db:open(DbName, [{user_ctx, UserCtx}]), - {ok, Info} = couch_db:get_db_info(Db), - couch_db:close(Db), +get_db_info(Db) -> + DbName = couch_db:name(Db), + UserCtx = couch_db:get_user_ctx(Db), + {ok, InfoDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]), + {ok, Info} = couch_db:get_db_info(InfoDb), + couch_db:close(InfoDb), {ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}. @@ -176,8 +178,10 @@ get_pending_count(#httpdb{} = Db, Seq) -> send_req(Db, Options, fun(200, _, {Props}) -> {ok, couch_util:get_value(<<"pending">>, Props, null)} end); -get_pending_count(#db{name=DbName}=Db, Seq) when is_number(Seq) -> - {ok, CountDb} = couch_db:open(DbName, [{user_ctx, Db#db.user_ctx}]), +get_pending_count(Db, Seq) when is_number(Seq) -> + DbName = couch_db:name(Db), + UserCtx = couch_db:get_user_ctx(Db), + {ok, CountDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]), Pending = couch_db:count_changes_since(CountDb, Seq), couch_db:close(CountDb), {ok, Pending}. @@ -189,7 +193,8 @@ get_view_info(#httpdb{} = Db, DDocId, ViewName) -> {VInfo} = couch_util:get_value(<<"view_index">>, Props, {[]}), {ok, VInfo} end); -get_view_info(#db{name = DbName}, DDocId, ViewName) -> +get_view_info(Db, DDocId, ViewName) -> + DbName = couch_db:name(Db), {ok, VInfo} = couch_mrview:get_view_info(DbName, DDocId, ViewName), {ok, [{couch_util:to_binary(K), V} || {K, V} <- VInfo]}. diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.hrl b/src/couch_replicator/src/couch_replicator_api_wrap.hrl index fc940545a..d2e0fdff5 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.hrl +++ b/src/couch_replicator/src/couch_replicator_api_wrap.hrl @@ -21,7 +21,7 @@ ], timeout, % milliseconds ibrowse_options = [], - retries = 10, + retries = 5, wait = 250, % milliseconds httpc_pool = nil, http_connections, diff --git a/src/couch_replicator/src/couch_replicator_clustering.erl b/src/couch_replicator/src/couch_replicator_clustering.erl index 7618f24d6..ed01465d5 100644 --- a/src/couch_replicator/src/couch_replicator_clustering.erl +++ b/src/couch_replicator/src/couch_replicator_clustering.erl @@ -28,6 +28,7 @@ -behaviour(gen_server). -behaviour(config_listener). +-behaviour(mem3_cluster). -export([ start_link/0 @@ -55,6 +56,12 @@ handle_config_terminate/3 ]). +% mem3_cluster callbacks +-export([ + cluster_stable/1, + cluster_unstable/1 +]). + -include_lib("couch/include/couch_db.hrl"). -include_lib("mem3/include/mem3.hrl"). @@ -63,11 +70,8 @@ -define(RELISTEN_DELAY, 5000). -record(state, { - start_time :: erlang:timestamp(), - last_change :: erlang:timestamp(), - period = ?DEFAULT_QUIET_PERIOD :: non_neg_integer(), - start_period = ?DEFAULT_START_PERIOD :: non_neg_integer(), - timer :: reference() + mem3_cluster_pid :: pid(), + cluster_stable :: boolean() }). @@ -115,64 +119,55 @@ link_cluster_event_listener(Mod, Fun, Args) Pid. +% Mem3 cluster callbacks + +cluster_unstable(Server) -> + couch_replicator_notifier:notify({cluster, unstable}), + couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0), + couch_log:notice("~s : cluster unstable", [?MODULE]), + gen_server:cast(Server, cluster_unstable), + Server. + +cluster_stable(Server) -> + couch_replicator_notifier:notify({cluster, stable}), + couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1), + couch_log:notice("~s : cluster stable", [?MODULE]), + gen_server:cast(Server, cluster_stable), + Server. + + % gen_server callbacks init([]) -> - net_kernel:monitor_nodes(true), ok = config:listen_for_changes(?MODULE, nil), Period = abs(config:get_integer("replicator", "cluster_quiet_period", ?DEFAULT_QUIET_PERIOD)), StartPeriod = abs(config:get_integer("replicator", "cluster_start_period", ?DEFAULT_START_PERIOD)), - couch_log:debug("Initialized clustering gen_server ~w", [self()]), couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0), - {ok, #state{ - start_time = os:timestamp(), - last_change = os:timestamp(), - period = Period, - start_period = StartPeriod, - timer = new_timer(StartPeriod) - }}. + {ok, Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(), StartPeriod, + Period), + {ok, #state{mem3_cluster_pid = Mem3Cluster, cluster_stable = false}}. terminate(_Reason, _State) -> ok. -handle_call(is_stable, _From, State) -> - {reply, is_stable(State), State}. +handle_call(is_stable, _From, #state{cluster_stable = IsStable} = State) -> + {reply, IsStable, State}. -handle_cast({set_period, QuietPeriod}, State) -> - {noreply, State#state{period = QuietPeriod}}. +handle_cast({set_period, Period}, #state{mem3_cluster_pid = Pid} = State) -> + ok = mem3_cluster:set_period(Pid, Period), + {noreply, State}; +handle_cast(cluster_stable, State) -> + {noreply, State#state{cluster_stable = true}}; -handle_info({nodeup, Node}, State) -> - Timer = new_timer(interval(State)), - couch_replicator_notifier:notify({cluster, unstable}), - couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0), - couch_log:notice("~s : nodeup ~s, cluster unstable", [?MODULE, Node]), - {noreply, State#state{last_change = os:timestamp(), timer = Timer}}; +handle_cast(cluster_unstable, State) -> + {noreply, State#state{cluster_stable = false}}. -handle_info({nodedown, Node}, State) -> - Timer = new_timer(interval(State)), - couch_replicator_notifier:notify({cluster, unstable}), - couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0), - couch_log:notice("~s : nodedown ~s, cluster unstable", [?MODULE, Node]), - {noreply, State#state{last_change = os:timestamp(), timer = Timer}}; - -handle_info(stability_check, State) -> - erlang:cancel_timer(State#state.timer), - case is_stable(State) of - true -> - couch_replicator_notifier:notify({cluster, stable}), - couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1), - couch_log:notice("~s : publish cluster `stable` event", [?MODULE]), - {noreply, State}; - false -> - Timer = new_timer(interval(State)), - {noreply, State#state{timer = Timer}} - end; handle_info(restart_config_listener, State) -> ok = config:listen_for_changes(?MODULE, nil), @@ -185,41 +180,6 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions --spec new_timer(non_neg_integer()) -> reference(). -new_timer(IntervalSec) -> - erlang:send_after(IntervalSec * 1000, self(), stability_check). - - -% For the first Period seconds after node boot we check cluster stability every -% StartPeriod seconds. Once the initial Period seconds have passed we continue -% to monitor once every Period seconds --spec interval(#state{}) -> non_neg_integer(). -interval(#state{period = Period, start_period = StartPeriod, - start_time = T0}) -> - case now_diff_sec(T0) > Period of - true -> - % Normal operation - Period; - false -> - % During startup - StartPeriod - end. - - --spec is_stable(#state{}) -> boolean(). -is_stable(#state{last_change = TS} = State) -> - now_diff_sec(TS) > interval(State). - - --spec now_diff_sec(erlang:timestamp()) -> non_neg_integer(). -now_diff_sec(Time) -> - case timer:now_diff(os:timestamp(), Time) of - USec when USec < 0 -> - 0; - USec when USec >= 0 -> - USec / 1000000 - end. - handle_config_change("replicator", "cluster_quiet_period", V, _, S) -> ok = gen_server:cast(?MODULE, {set_period, list_to_integer(V)}), diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl index a49d692d9..9d844b9e7 100644 --- a/src/couch_replicator/src/couch_replicator_docs.erl +++ b/src/couch_replicator/src/couch_replicator_docs.erl @@ -121,7 +121,7 @@ update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) -> ok. --spec ensure_rep_db_exists() -> {ok, #db{}}. +-spec ensure_rep_db_exists() -> {ok, Db::any()}. ensure_rep_db_exists() -> Db = case couch_db:open_int(?REP_DB_NAME, [?CTX, sys_db, nologifmissing]) of @@ -466,7 +466,7 @@ make_options(Props) -> DefBatchSize = config:get("replicator", "worker_batch_size", "500"), DefConns = config:get("replicator", "http_connections", "20"), DefTimeout = config:get("replicator", "connection_timeout", "30000"), - DefRetries = config:get("replicator", "retries_per_request", "10"), + DefRetries = config:get("replicator", "retries_per_request", "5"), UseCheckpoints = config:get("replicator", "use_checkpoints", "true"), DefCheckpointInterval = config:get("replicator", "checkpoint_interval", "30000"), @@ -621,11 +621,14 @@ ssl_verify_options(false) -> [{verify, verify_none}]. --spec before_doc_update(#doc{}, #db{}) -> #doc{}. +-spec before_doc_update(#doc{}, Db::any()) -> #doc{}. before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) -> Doc; -before_doc_update(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) -> - #user_ctx{roles = Roles, name = Name} = UserCtx, +before_doc_update(#doc{body = {Body}} = Doc, Db) -> + #user_ctx{ + roles = Roles, + name = Name + } = couch_db:get_user_ctx(Db), case lists:member(<<"_replicator">>, Roles) of true -> Doc; @@ -649,11 +652,11 @@ before_doc_update(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) -> end. --spec after_doc_read(#doc{}, #db{}) -> #doc{}. +-spec after_doc_read(#doc{}, Db::any()) -> #doc{}. after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) -> Doc; -after_doc_read(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) -> - #user_ctx{name = Name} = UserCtx, +after_doc_read(#doc{body = {Body}} = Doc, Db) -> + #user_ctx{name = Name} = couch_db:get_user_ctx(Db), case (catch couch_db:check_is_admin(Db)) of ok -> Doc; diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl index 62cfdf267..e7067622b 100644 --- a/src/couch_replicator/src/couch_replicator_ids.erl +++ b/src/couch_replicator/src/couch_replicator_ids.erl @@ -78,7 +78,11 @@ replication_id(#rep{user_ctx = UserCtx} = Rep, 1) -> -spec convert([_] | binary() | {string(), string()}) -> {string(), string()}. convert(Id) when is_list(Id) -> convert(?l2b(Id)); -convert(Id) when is_binary(Id) -> +convert(Id0) when is_binary(Id0) -> + % Spaces can result from mochiweb incorrectly unquoting + characters from + % the URL path. So undo the incorrect parsing here to avoid forcing + % users to url encode + characters. + Id = binary:replace(Id0, <<" ">>, <<"+">>, [global]), lists:splitwith(fun(Char) -> Char =/= $+ end, ?b2l(Id)); convert({BaseId, Ext} = Id) when is_list(BaseId), is_list(Ext) -> Id. @@ -222,6 +226,16 @@ get_non_default_port(_Schema, Port) -> -include_lib("eunit/include/eunit.hrl"). + +replication_id_convert_test_() -> + [?_assertEqual(Expected, convert(Id)) || {Expected, Id} <- [ + {{"abc", ""}, "abc"}, + {{"abc", ""}, <<"abc">>}, + {{"abc", "+x+y"}, <<"abc+x+y">>}, + {{"abc", "+x+y"}, {"abc", "+x+y"}}, + {{"abc", "+x+y"}, <<"abc x y">>} + ]]. + http_v4_endpoint_test_() -> [?_assertMatch({remote, User, Host, Port, Path, HeadersNoAuth, undefined}, get_v4_endpoint(nil, #httpdb{url = Url, headers = Headers})) || diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index 6a5722521..e7ce576f4 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -95,8 +95,6 @@ start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) -> case gen_server:start_link(ServerName, ?MODULE, Rep, []) of {ok, Pid} -> - couch_log:notice("starting new replication `~s` at ~p (`~s` -> `~s`)", - [RepChildId, Pid, Source, Target]), {ok, Pid}; {error, Reason} -> couch_log:warning("failed to start replication `~s` (`~s` -> `~s`)", @@ -184,24 +182,7 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) -> % cancel_replication/1) and then start the replication again, but this is % unfortunately not immune to race conditions. - couch_log:notice("Replication `~p` is using:~n" - "~c~p worker processes~n" - "~ca worker batch size of ~p~n" - "~c~p HTTP connections~n" - "~ca connection timeout of ~p milliseconds~n" - "~c~p retries per request~n" - "~csocket options are: ~s~s", - [BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t, - MaxConns, $\t, get_value(connection_timeout, Options), - $\t, get_value(retries, Options), - $\t, io_lib:format("~p", [get_value(socket_options, Options)]), - case StartSeq of - ?LOWEST_SEQ -> - ""; - _ -> - io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq]) - end]), - + log_replication_start(State), couch_log:debug("Worker pids are: ~p", [Workers]), doc_update_triggered(Rep), @@ -254,16 +235,21 @@ handle_call({report_seq_done, Seq, StatsInc}, From, update_task(NewState), {noreply, NewState}. - -handle_cast({db_compacted, DbName}, - #rep_state{source = #db{name = DbName} = Source} = State) -> - {ok, NewSource} = couch_db:reopen(Source), - {noreply, State#rep_state{source = NewSource}}; - -handle_cast({db_compacted, DbName}, - #rep_state{target = #db{name = DbName} = Target} = State) -> - {ok, NewTarget} = couch_db:reopen(Target), - {noreply, State#rep_state{target = NewTarget}}; +handle_cast({db_compacted, DbName}, State) -> + #rep_state{ + source = Source, + target = Target + } = State, + SourceName = couch_replicator_utils:local_db_name(Source), + TargetName = couch_replicator_utils:local_db_name(Target), + case DbName of + SourceName -> + {ok, NewSource} = couch_db:reopen(Source), + {noreply, State#rep_state{source = NewSource}}; + TargetName -> + {ok, NewTarget} = couch_db:reopen(Target), + {noreply, State#rep_state{target = NewTarget}} + end; handle_cast(checkpoint, State) -> case do_checkpoint(State) of @@ -358,10 +344,11 @@ handle_info(timeout, InitArgs) -> {stop, {shutdown, max_backoff}, {error, InitArgs}}; Class:Error -> ShutdownReason = {error, replication_start_error(Error)}, + StackTop2 = lists:sublist(erlang:get_stacktrace(), 2), % Shutdown state is a hack as it is not really the state of the % gen_server (it failed to initialize, so it doesn't have one). % Shutdown state is used to pass extra info about why start failed. - ShutdownState = {error, Class, erlang:get_stacktrace(), InitArgs}, + ShutdownState = {error, Class, StackTop2, InitArgs}, {stop, {shutdown, ShutdownReason}, ShutdownState} end. @@ -394,11 +381,20 @@ terminate({shutdown, max_backoff}, {error, InitArgs}) -> couch_replicator_notifier:notify({error, RepId, max_backoff}); terminate({shutdown, {error, Error}}, {error, Class, Stack, InitArgs}) -> - #rep{id=RepId} = InitArgs, + #rep{ + id = {BaseId, Ext} = RepId, + source = Source0, + target = Target0, + doc_id = DocId, + db_name = DbName + } = InitArgs, + Source = couch_replicator_api_wrap:db_uri(Source0), + Target = couch_replicator_api_wrap:db_uri(Target0), + RepIdStr = BaseId ++ Ext, + Msg = "~p:~p: Replication ~s failed to start ~p -> ~p doc ~p:~p stack:~p", + couch_log:error(Msg, [Class, Error, RepIdStr, Source, Target, DbName, + DocId, Stack]), couch_stats:increment_counter([couch_replicator, failed_starts]), - CleanInitArgs = rep_strip_creds(InitArgs), - couch_log:error("~p:~p: Replication failed to start for args ~p: ~p", - [Class, Error, CleanInitArgs, Stack]), couch_replicator_notifier:notify({error, RepId, Error}); terminate({shutdown, max_backoff}, State) -> @@ -436,7 +432,37 @@ code_change(_OldVsn, #rep_state{}=State, _Extra) -> format_status(_Opt, [_PDict, State]) -> - [{data, [{"State", state_strip_creds(State)}]}]. + #rep_state{ + source = Source, + target = Target, + rep_details = RepDetails, + start_seq = StartSeq, + source_seq = SourceSeq, + committed_seq = CommitedSeq, + current_through_seq = ThroughSeq, + highest_seq_done = HighestSeqDone, + session_id = SessionId + } = state_strip_creds(State), + #rep{ + id = RepId, + options = Options, + doc_id = DocId, + db_name = DbName + } = RepDetails, + [ + {rep_id, RepId}, + {source, couch_replicator_api_wrap:db_uri(Source)}, + {target, couch_replicator_api_wrap:db_uri(Target)}, + {db_name, DbName}, + {doc_id, DocId}, + {options, Options}, + {session_id, SessionId}, + {start_seq, StartSeq}, + {source_seq, SourceSeq}, + {committed_seq, CommitedSeq}, + {current_through_seq, ThroughSeq}, + {highest_seq_done, HighestSeqDone} + ]. startup_jitter() -> @@ -910,10 +936,10 @@ has_session_id(SessionId, [{Props} | Rest]) -> end. -db_monitor(#db{} = Db) -> - couch_db:monitor(Db); -db_monitor(_HttpDb) -> - nil. +db_monitor(#httpdb{}) -> + nil; +db_monitor(Db) -> + couch_db:monitor(Db). get_pending_count(St) -> @@ -984,5 +1010,99 @@ replication_start_error({unauthorized, DbUri}) -> {unauthorized, <<"unauthorized to access or create database ", DbUri/binary>>}; replication_start_error({db_not_found, DbUri}) -> {db_not_found, <<"could not open ", DbUri/binary>>}; +replication_start_error({http_request_failed, _Method, Url0, + {error, {error, {conn_failed, {error, nxdomain}}}}}) -> + Url = ?l2b(couch_util:url_strip_password(Url0)), + {nxdomain, <<"could not resolve ", Url/binary>>}; +replication_start_error({http_request_failed, Method0, Url0, + {error, {code, Code}}}) when is_integer(Code) -> + Url = ?l2b(couch_util:url_strip_password(Url0)), + Method = ?l2b(Method0), + {http_error_code, Code, <<Method/binary, " ", Url/binary>>}; replication_start_error(Error) -> Error. + + +log_replication_start(#rep_state{rep_details = Rep} = RepState) -> + #rep{ + id = {BaseId, Ext}, + doc_id = DocId, + db_name = DbName, + options = Options + } = Rep, + Id = BaseId ++ Ext, + Workers = get_value(worker_processes, Options), + BatchSize = get_value(worker_batch_size, Options), + #rep_state{ + source_name = Source, % credentials already stripped + target_name = Target, % credentials already stripped + session_id = Sid + } = RepState, + From = case DbName of + ShardName when is_binary(ShardName) -> + io_lib:format("from doc ~s:~s", [mem3:dbname(ShardName), DocId]); + _ -> + "from _replicate endpoint" + end, + Msg = "Starting replication ~s (~s -> ~s) ~s worker_procesess:~p" + " worker_batch_size:~p session_id:~s", + couch_log:notice(Msg, [Id, Source, Target, From, Workers, BatchSize, Sid]). + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + + +replication_start_error_test() -> + ?assertEqual({unauthorized, <<"unauthorized to access or create database" + " http://x/y">>}, replication_start_error({unauthorized, + <<"http://x/y">>})), + ?assertEqual({db_not_found, <<"could not open http://x/y">>}, + replication_start_error({db_not_found, <<"http://x/y">>})), + ?assertEqual({nxdomain,<<"could not resolve http://x/y">>}, + replication_start_error({http_request_failed, "GET", "http://x/y", + {error, {error, {conn_failed, {error, nxdomain}}}}})), + ?assertEqual({http_error_code,503,<<"GET http://x/y">>}, + replication_start_error({http_request_failed, "GET", "http://x/y", + {error, {code, 503}}})). + + +scheduler_job_format_status_test() -> + Source = <<"http://u:p@h1/d1">>, + Target = <<"http://u:p@h2/d2">>, + Rep = #rep{ + id = {"base", "+ext"}, + source = couch_replicator_docs:parse_rep_db(Source, [], []), + target = couch_replicator_docs:parse_rep_db(Target, [], []), + options = [{create_target, true}], + doc_id = <<"mydoc">>, + db_name = <<"mydb">> + }, + State = #rep_state{ + rep_details = Rep, + source = Rep#rep.source, + target = Rep#rep.target, + session_id = <<"a">>, + start_seq = <<"1">>, + source_seq = <<"2">>, + committed_seq = <<"3">>, + current_through_seq = <<"4">>, + highest_seq_done = <<"5">> + }, + Format = format_status(opts_ignored, [pdict, State]), + ?assertEqual("http://u:*****@h1/d1/", proplists:get_value(source, Format)), + ?assertEqual("http://u:*****@h2/d2/", proplists:get_value(target, Format)), + ?assertEqual({"base", "+ext"}, proplists:get_value(rep_id, Format)), + ?assertEqual([{create_target, true}], proplists:get_value(options, Format)), + ?assertEqual(<<"mydoc">>, proplists:get_value(doc_id, Format)), + ?assertEqual(<<"mydb">>, proplists:get_value(db_name, Format)), + ?assertEqual(<<"a">>, proplists:get_value(session_id, Format)), + ?assertEqual(<<"1">>, proplists:get_value(start_seq, Format)), + ?assertEqual(<<"2">>, proplists:get_value(source_seq, Format)), + ?assertEqual(<<"3">>, proplists:get_value(committed_seq, Format)), + ?assertEqual(<<"4">>, proplists:get_value(current_through_seq, Format)), + ?assertEqual(<<"5">>, proplists:get_value(highest_seq_done, Format)). + + +-endif. diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl index 05836d483..01881e423 100644 --- a/src/couch_replicator/src/couch_replicator_utils.erl +++ b/src/couch_replicator/src/couch_replicator_utils.erl @@ -16,6 +16,7 @@ parse_rep_doc/2, open_db/1, close_db/1, + local_db_name/1, start_db_compaction_notifier/2, stop_db_compaction_notifier/1, replication_id/2, @@ -35,6 +36,7 @@ -include_lib("couch/include/couch_db.hrl"). -include("couch_replicator.hrl"). +-include("couch_replicator_api_wrap.hrl"). -import(couch_util, [ get_value/2, @@ -42,26 +44,35 @@ ]). -open_db(#db{name = Name, user_ctx = UserCtx}) -> - {ok, Db} = couch_db:open(Name, [{user_ctx, UserCtx} | []]), - Db; -open_db(HttpDb) -> - HttpDb. +open_db(#httpdb{} = HttpDb) -> + HttpDb; +open_db(Db) -> + DbName = couch_db:name(Db), + UserCtx = couch_db:get_user_ctx(Db), + {ok, NewDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]), + NewDb. -close_db(#db{} = Db) -> - couch_db:close(Db); -close_db(_HttpDb) -> - ok. +close_db(#httpdb{}) -> + ok; +close_db(Db) -> + couch_db:close(Db). + + +local_db_name(#httpdb{}) -> + undefined; +local_db_name(Db) -> + couch_db:name(Db). -start_db_compaction_notifier(#db{name = DbName}, Server) -> +start_db_compaction_notifier(#httpdb{}, _) -> + nil; +start_db_compaction_notifier(Db, Server) -> + DbName = couch_db:name(Db), {ok, Pid} = couch_event:link_listener( ?MODULE, handle_db_event, Server, [{dbname, DbName}] ), - Pid; -start_db_compaction_notifier(_, _) -> - nil. + Pid. stop_db_compaction_notifier(nil) -> diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index b52640d5d..344b8f286 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -67,16 +67,16 @@ -start_link(Cp, #db{} = Source, Target, ChangesManager, _MaxConns) -> +start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns) -> + gen_server:start_link( + ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []); + +start_link(Cp, Source, Target, ChangesManager, _MaxConns) -> Pid = spawn_link(fun() -> erlang:put(last_stats_report, now()), queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager) end), - {ok, Pid}; - -start_link(Cp, Source, Target, ChangesManager, MaxConns) -> - gen_server:start_link( - ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []). + {ok, Pid}. init({Cp, Source, Target, ChangesManager, MaxConns}) -> @@ -139,15 +139,23 @@ handle_call(flush, {Pid, _} = From, {noreply, State2#state{flush_waiter = From}}. -handle_cast({db_compacted, DbName}, - #state{source = #db{name = DbName} = Source} = State) -> - {ok, NewSource} = couch_db:reopen(Source), - {noreply, State#state{source = NewSource}}; - -handle_cast({db_compacted, DbName}, - #state{target = #db{name = DbName} = Target} = State) -> - {ok, NewTarget} = couch_db:reopen(Target), - {noreply, State#state{target = NewTarget}}; +handle_cast({db_compacted, DbName} = Msg, #state{} = State) -> + #state{ + source = Source, + target = Target + } = State, + SourceName = couch_replicator_utils:local_db_name(Source), + TargetName = couch_replicator_utils:local_db_name(Target), + case DbName of + SourceName -> + {ok, NewSource} = couch_db:reopen(Source), + {noreply, State#state{source = NewSource}}; + TargetName -> + {ok, NewTarget} = couch_db:reopen(Target), + {noreply, State#state{target = NewTarget}}; + _Else -> + {stop, {unexpected_async_call, Msg}, State} + end; handle_cast(Msg, State) -> {stop, {unexpected_async_call, Msg}, State}. @@ -227,15 +235,15 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) -> Target2 = open_db(Target), {IdRevs, Stats0} = find_missing(Changes, Target2), case Source of - #db{} -> - Source2 = open_db(Source), - Stats = local_process_batch( - IdRevs, Cp, Source2, Target2, #batch{}, Stats0), - close_db(Source2); #httpdb{} -> ok = gen_server:call(Parent, {add_stats, Stats0}, infinity), remote_process_batch(IdRevs, Parent), - {ok, Stats} = gen_server:call(Parent, flush, infinity) + {ok, Stats} = gen_server:call(Parent, flush, infinity); + _Db -> + Source2 = open_db(Source), + Stats = local_process_batch( + IdRevs, Cp, Source2, Target2, #batch{}, Stats0), + close_db(Source2) end, close_db(Target2), ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity), @@ -252,7 +260,7 @@ local_process_batch([], Cp, Source, Target, #batch{docs = Docs, size = Size}, St case Target of #httpdb{} -> couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]); - #db{} -> + _Db -> couch_log:debug("Worker flushing doc batch of ~p docs", [Size]) end, Stats2 = flush_docs(Target, Docs), @@ -367,7 +375,7 @@ spawn_writer(Target, #batch{docs = DocList, size = Size}) -> case {Target, Size > 0} of {#httpdb{}, true} -> couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]); - {#db{}, true} -> + {_Db, true} -> couch_log:debug("Worker flushing doc batch of ~p docs", [Size]); _ -> ok @@ -429,7 +437,7 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) -> end end; -maybe_flush_docs(#db{} = Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) -> +maybe_flush_docs(Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) -> case SizeAcc + 1 of SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN -> couch_log:debug("Worker flushing doc batch of ~p docs", [SizeAcc2]), diff --git a/src/couch_replicator/test/couch_replicator_compact_tests.erl b/src/couch_replicator/test/couch_replicator_compact_tests.erl index a98feee66..f06a684b5 100644 --- a/src/couch_replicator/test/couch_replicator_compact_tests.erl +++ b/src/couch_replicator/test/couch_replicator_compact_tests.erl @@ -87,8 +87,8 @@ should_all_processes_be_alive(RepPid, Source, Target) -> {ok, SourceDb} = reopen_db(Source), {ok, TargetDb} = reopen_db(Target), ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(SourceDb#db.main_pid)), - ?assert(is_process_alive(TargetDb#db.main_pid)) + ?assert(is_process_alive(couch_db:get_pid(SourceDb))), + ?assert(is_process_alive(couch_db:get_pid(TargetDb))) end). should_run_replication(RepPid, RepId, Source, Target) -> @@ -164,12 +164,12 @@ should_populate_and_compact(RepPid, Source, Target, BatchSize, Rounds) -> compact_db("source", SourceDb), ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(SourceDb#db.main_pid)), + ?assert(is_process_alive(couch_db:get_pid(SourceDb))), wait_for_compaction("source", SourceDb), compact_db("target", TargetDb), ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(TargetDb#db.main_pid)), + ?assert(is_process_alive(couch_db:get_pid(TargetDb))), wait_for_compaction("target", TargetDb), {ok, SourceDb2} = reopen_db(SourceDb), @@ -180,14 +180,14 @@ should_populate_and_compact(RepPid, Source, Target, BatchSize, Rounds) -> compact_db("source", SourceDb2), ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(SourceDb2#db.main_pid)), + ?assert(is_process_alive(couch_db:get_pid(SourceDb2))), pause_writer(Writer), wait_for_compaction("source", SourceDb2), resume_writer(Writer), compact_db("target", TargetDb2), ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(TargetDb2#db.main_pid)), + ?assert(is_process_alive(couch_db:get_pid(TargetDb2))), pause_writer(Writer), wait_for_compaction("target", TargetDb2), resume_writer(Writer) @@ -263,14 +263,16 @@ should_compare_databases(Source, Target) -> reopen_db({remote, Db}) -> reopen_db(Db); -reopen_db(#db{name=DbName}) -> - reopen_db(DbName); -reopen_db(DbName) -> +reopen_db(DbName) when is_binary(DbName) -> {ok, Db} = couch_db:open_int(DbName, []), ok = couch_db:close(Db), - {ok, Db}. + {ok, Db}; +reopen_db(Db) -> + reopen_db(couch_db:name(Db)). -compact_db(Type, #db{name = Name}) -> + +compact_db(Type, Db0) -> + Name = couch_db:name(Db0), {ok, Db} = couch_db:open_int(Name, []), {ok, CompactPid} = couch_db:start_compact(Db), MonRef = erlang:monitor(process, CompactPid), @@ -405,7 +407,8 @@ stop_writer(Pid) -> {reason, "Timeout stopping source database writer"}]}) end. -writer_loop(#db{name = DbName}, Parent, Counter) -> +writer_loop(Db0, Parent, Counter) -> + DbName = couch_db:name(Db0), {ok, Data} = file:read_file(?ATTFILE), maybe_pause(Parent, Counter), Doc = couch_doc:from_json_obj({[ diff --git a/src/couch_stats/src/couch_stats.app.src b/src/couch_stats/src/couch_stats.app.src index d60ce1c0a..6339a0f1d 100644 --- a/src/couch_stats/src/couch_stats.app.src +++ b/src/couch_stats/src/couch_stats.app.src @@ -16,7 +16,5 @@ {registered, [couch_stats_aggregator, couch_stats_process_tracker]}, {applications, [kernel, stdlib, folsom, couch_log]}, {mod, {couch_stats_app, []}}, - {env, [ - {collection_interval, 10} - ]} + {env, []} ]}. diff --git a/src/couch_stats/src/couch_stats.erl b/src/couch_stats/src/couch_stats.erl index e02da29f1..59175f7a8 100644 --- a/src/couch_stats/src/couch_stats.erl +++ b/src/couch_stats/src/couch_stats.erl @@ -29,6 +29,10 @@ update_gauge/2 ]). + +-include("couch_stats.hrl"). + + -type response() :: ok | {error, unknown_metric}. -type stat() :: {any(), [{atom(), any()}]}. @@ -56,7 +60,7 @@ new(counter, Name) -> {error, Name, metric_already_exists} -> {error, metric_exists} end; new(histogram, Name) -> - {ok, Time} = application:get_env(couch_stats, collection_interval), + Time = config:get_integer("stats", "interval", ?DEFAULT_INTERVAL), case folsom_metrics:new_histogram(Name, slide_uniform, {Time, 1024}) of ok -> ok; {error, Name, metric_already_exists} -> {error, metric_exists} diff --git a/src/couch_stats/src/couch_stats.hrl b/src/couch_stats/src/couch_stats.hrl new file mode 100644 index 000000000..3cffe99f1 --- /dev/null +++ b/src/couch_stats/src/couch_stats.hrl @@ -0,0 +1,14 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-define(DEFAULT_INTERVAL, 10). +-define(RELOAD_INTERVAL, 600). diff --git a/src/couch_stats/src/couch_stats_aggregator.erl b/src/couch_stats/src/couch_stats_aggregator.erl index 0f6c9dd83..17bd6fc33 100644 --- a/src/couch_stats/src/couch_stats_aggregator.erl +++ b/src/couch_stats/src/couch_stats_aggregator.erl @@ -30,6 +30,9 @@ terminate/2 ]). + +-include("couch_stats.hrl"). + -record(st, { descriptions, stats, @@ -52,11 +55,9 @@ start_link() -> init([]) -> {ok, Descs} = reload_metrics(), - Interval = case application:get_env(couch_stats, collection_interval) of - {ok, I} -> I * 1000 - end, - {ok, CT} = timer:send_interval(Interval, self(), collect), - {ok, RT} = timer:send_interval(600000, self(), reload), + Interval = config:get_integer("stats", "interval", ?DEFAULT_INTERVAL), + {ok, CT} = timer:send_interval(Interval * 1000, self(), collect), + {ok, RT} = timer:send_interval(?RELOAD_INTERVAL * 1000, self(), reload), {ok, #st{descriptions=Descs, stats=[], collect_timer=CT, reload_timer=RT}}. handle_call(fetch, _from, #st{stats = Stats}=State) -> diff --git a/src/fabric/include/couch_db_tmp.hrl b/src/fabric/include/couch_db_tmp.hrl deleted file mode 100644 index cd3a047d4..000000000 --- a/src/fabric/include/couch_db_tmp.hrl +++ /dev/null @@ -1,296 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --define(LOCAL_DOC_PREFIX, "_local/"). --define(DESIGN_DOC_PREFIX0, "_design"). --define(DESIGN_DOC_PREFIX, "_design/"). - --define(MIN_STR, <<"">>). --define(MAX_STR, <<255>>). % illegal utf string - --define(JSON_ENCODE(V), couch_util:json_encode(V)). --define(JSON_DECODE(V), couch_util:json_decode(V)). - --define(b2l(V), binary_to_list(V)). --define(l2b(V), list_to_binary(V)). - --define(DEFAULT_ATTACHMENT_CONTENT_TYPE, <<"application/octet-stream">>). - --define(LOG_DEBUG(Format, Args), couch_log:debug(Format, Args)). --define(LOG_INFO(Format, Args), couch_log:notice(Format, Args)). --define(LOG_ERROR(Format, Args), couch_log:error(Format, Args)). - --record(rev_info, - { - rev, - seq = 0, - deleted = false, - body_sp = nil % stream pointer - }). - --record(doc_info, - { - id = <<"">>, - high_seq = 0, - revs = [] % rev_info - }). - --record(full_doc_info, - {id = <<"">>, - update_seq = 0, - deleted = false, - data_size = 0, - rev_tree = [] - }). - --record(httpd, - {mochi_req, - peer, - method, - path_parts, - db_url_handlers, - user_ctx, - req_body = undefined, - design_url_handlers, - auth, - default_fun, - url_handlers - }). - - --record(doc, - { - id = <<"">>, - revs = {0, []}, - - % the json body object. - body = {[]}, - - atts = [], % attachments - - deleted = false, - - % key/value tuple of meta information, provided when using special options: - % couch_db:open_doc(Db, Id, Options). - meta = [] - }). - - --record(att, - { - name, - type, - att_len, - disk_len, % length of the attachment in its identity form - % (that is, without a content encoding applied to it) - % differs from att_len when encoding /= identity - md5= <<>>, - revpos=0, - data, - encoding=identity % currently supported values are: - % identity, gzip - % additional values to support in the future: - % deflate, compress - }). - - --record(user_ctx, - { - name=null, - roles=[], - handler - }). - -% This should be updated anytime a header change happens that requires more -% than filling in new defaults. -% -% As long the changes are limited to new header fields (with inline -% defaults) added to the end of the record, then there is no need to increment -% the disk revision number. -% -% if the disk revision is incremented, then new upgrade logic will need to be -% added to couch_db_updater:init_db. - --define(LATEST_DISK_VERSION, 5). - --record(db_header, - {disk_version = ?LATEST_DISK_VERSION, - update_seq = 0, - unused = 0, - id_tree_state = nil, - seq_tree_state = nil, - local_tree_state = nil, - purge_seq = 0, - purged_docs = nil, - security_ptr = nil, - revs_limit = 1000 - }). - --record(db, - {main_pid = nil, - update_pid = nil, - compactor_pid = nil, - instance_start_time, % number of microsecs since jan 1 1970 as a binary string - fd, - fd_monitor, - header = #db_header{}, - committed_update_seq, - id_tree, - seq_tree, - local_tree, - update_seq, - name, - filepath, - validate_doc_funs = undefined, - security = [], - security_ptr = nil, - user_ctx = #user_ctx{}, - waiting_delayed_commit = nil, - revs_limit = 1000, - fsync_options = [], - is_sys_db = false - }). - - --record(view_query_args, { - start_key, - end_key, - start_docid = ?MIN_STR, - end_docid = ?MAX_STR, - - direction = fwd, - inclusive_end=true, % aka a closed-interval - - limit = 10000000000, % Huge number to simplify logic - skip = 0, - - group_level = 0, - - view_type = nil, - include_docs = false, - stale = false, - multi_get = false, - callback = nil, - list = nil, - keys = nil, - sorted = true, - extra = [] -}). - --record(view_fold_helper_funs, { - reduce_count, - passed_end, - start_response, - send_row -}). - --record(reduce_fold_helper_funs, { - start_response, - send_row -}). - --record(extern_resp_args, { - code = 200, - stop = false, - data = <<>>, - ctype = "application/json", - headers = [], - json = nil -}). - --record(group, { - sig=nil, - dbname, - fd=nil, - name, - def_lang, - design_options=[], - views, - id_btree=nil, - current_seq=0, - purge_seq=0, - query_server=nil, - waiting_delayed_commit=nil, - atts=[] - }). - --record(view, - {id_num, - map_names=[], - def, - btree=nil, - reduce_funs=[], - dbcopies=[], - options=[] - }). - --record(index_header, - {seq=0, - purge_seq=0, - id_btree_state=nil, - view_states=nil - }). - --record(http_db, { - url, - auth = [], - resource = "", - headers = [ - {"User-Agent", "CouchDB/"++couch:version()}, - {"Accept", "application/json"}, - {"Accept-Encoding", "gzip"} - ], - qs = [], - method = get, - body = nil, - options = [ - {response_format,binary}, - {inactivity_timeout, 30000} - ], - retries = 10, - pause = 500, - conn = nil -}). - -% small value used in revision trees to indicate the revision isn't stored --define(REV_MISSING, []). - --record(changes_args, { - feed = "normal", - dir = fwd, - since = "0", - limit = 1000000000000000, - style = main_only, - heartbeat, - timeout, - filter, - include_docs = false -}). - --record(proc, { - pid, - lang, - client = nil, - ddoc_keys = [], - prompt_fun, - set_timeout_fun, - stop_fun, - data_fun -}). - --record(leaf, { - deleted, - ptr, - seq, - size = 0, - atts = [] -}). diff --git a/src/fabric/rebar.config b/src/fabric/rebar.config index ccfb9b435..df35ac639 100644 --- a/src/fabric/rebar.config +++ b/src/fabric/rebar.config @@ -11,5 +11,5 @@ % the License. {deps, [ - {meck, ".*", {git, "https://github.com/apache/couchdb-meck.git", {tag, "0.8.2"}}} + {meck, ".*", {git, "https://github.com/apache/couchdb-meck.git", {tag, "0.8.8"}}} ]}. diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl index 1dcdb0e00..4a0727131 100644 --- a/src/fabric/src/fabric.erl +++ b/src/fabric/src/fabric.erl @@ -38,7 +38,7 @@ -include_lib("fabric/include/fabric.hrl"). --type dbname() :: (iodata() | #db{}). +-type dbname() :: (iodata() | tuple()). -type docid() :: iodata(). -type revision() :: {integer(), binary()}. -type callback() :: fun((any(), any()) -> {ok | stop, any()}). @@ -483,10 +483,12 @@ dbname(DbName) when is_list(DbName) -> list_to_binary(DbName); dbname(DbName) when is_binary(DbName) -> DbName; -dbname(#db{name=Name}) -> - Name; -dbname(DbName) -> - erlang:error({illegal_database_name, DbName}). +dbname(Db) -> + try + couch_db:name(Db) + catch error:badarg -> + erlang:error({illegal_database_name, Db}) + end. name(Thing) -> couch_util:to_binary(Thing). diff --git a/src/fabric/src/fabric_db_info.erl b/src/fabric/src/fabric_db_info.erl index ab93e4736..98e8e52e4 100644 --- a/src/fabric/src/fabric_db_info.erl +++ b/src/fabric/src/fabric_db_info.erl @@ -22,7 +22,8 @@ go(DbName) -> Workers = fabric_util:submit_jobs(Shards, get_db_info, []), RexiMon = fabric_util:create_monitors(Shards), Fun = fun handle_message/3, - Acc0 = {fabric_dict:init(Workers, nil), []}, + {ok, ClusterInfo} = get_cluster_info(Shards), + Acc0 = {fabric_dict:init(Workers, nil), [{cluster, ClusterInfo}]}, try case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of {ok, Acc} -> {ok, Acc}; @@ -104,6 +105,8 @@ merge_results(Info) -> [{other, {merge_other_results(X)}} | Acc]; (disk_format_version, X, Acc) -> [{disk_format_version, lists:max(X)} | Acc]; + (cluster, [X], Acc) -> + [{cluster, {X}} | Acc]; (_, _, Acc) -> Acc end, [{instance_start_time, <<"0">>}], Dict). @@ -127,3 +130,46 @@ merge_object(Objects) -> (Key, X, Acc) -> [{Key, lists:sum(X)} | Acc] end, [], Dict). + +get_cluster_info(Shards) -> + Dict = lists:foldl(fun(#shard{range = R}, Acc) -> + dict:update_counter(R, 1, Acc) + end, dict:new(), Shards), + Q = dict:size(Dict), + N = dict:fold(fun(_, X, Acc) -> max(X, Acc) end, 0, Dict), + %% defaults as per mem3:quorum/1 + WR = N div 2 + 1, + {ok, [{q, Q}, {n, N}, {w, WR}, {r, WR}]}. + + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +get_cluster_info_test_() -> + { + setup, + fun setup/0, + fun get_cluster_info_test_generator/1 + }. + + +setup() -> + Quorums = [1, 2, 3], + Shards = [1, 3, 5, 8, 12, 24], + [{N, Q} || N <- Quorums, Q <- Shards]. + +get_cluster_info_test_generator([]) -> + []; +get_cluster_info_test_generator([{N, Q} | Rest]) -> + {generator, + fun() -> + Nodes = lists:seq(1, 8), + Shards = mem3_util:create_partition_map(<<"foo">>, N, Q, Nodes), + {ok, Info} = get_cluster_info(Shards), + [ + ?_assertEqual(N, couch_util:get_value(n, Info)), + ?_assertEqual(Q, couch_util:get_value(q, Info)) + ] ++ get_cluster_info_test_generator(Rest) + end}. + +-endif. diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 93d7d1536..9cf653c59 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -16,8 +16,9 @@ -export([open_doc/3, open_revs/4, get_doc_info/3, get_full_doc_info/3, get_missing_revs/2, get_missing_revs/3, update_docs/3]). -export([all_docs/3, changes/3, map_view/4, reduce_view/4, group_info/2]). --export([create_db/1, delete_db/1, reset_validation_funs/1, set_security/3, - set_revs_limit/3, create_shard_db_doc/2, delete_shard_db_doc/2]). +-export([create_db/1, create_db/2, delete_db/1, reset_validation_funs/1, + set_security/3, set_revs_limit/3, create_shard_db_doc/2, + delete_shard_db_doc/2]). -export([get_all_security/2, open_shard/2]). -export([compact/1, compact/2]). @@ -38,7 +39,8 @@ }). %% rpc endpoints -%% call to with_db will supply your M:F with a #db{} and then remaining args +%% call to with_db will supply your M:F with a Db instance +%% and then remaining args %% @equiv changes(DbName, Args, StartSeq, []) changes(DbName, Args, StartSeq) -> @@ -76,13 +78,13 @@ changes(DbName, Options, StartVector, DbOptions) -> args = Args, options = Options, pending = couch_db:count_changes_since(Db, StartSeq), - epochs = get_epochs(Db) + epochs = couch_db:get_epochs(Db) }, try {ok, #cacc{seq=LastSeq, pending=Pending, epochs=Epochs}} = couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0), rexi:stream_last({complete, [ - {seq, {LastSeq, uuid(Db), owner_of(LastSeq, Epochs)}}, + {seq, {LastSeq, uuid(Db), couch_db:owner_of(Epochs, LastSeq)}}, {pending, Pending} ]}) after @@ -144,7 +146,10 @@ fix_skip_and_limit(Args) -> Args#mrargs{skip=0, limit=Skip+Limit}. create_db(DbName) -> - rexi:reply(case couch_server:create(DbName, []) of + create_db(DbName, []). + +create_db(DbName, Options) -> + rexi:reply(case couch_server:create(DbName, Options) of {ok, _} -> ok; Error -> @@ -225,7 +230,7 @@ get_missing_revs(DbName, IdRevsList, Options) -> not_found -> {Id, Revs, []} end - end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))}; + end, IdRevsList, couch_db:get_full_doc_infos(Db, Ids))}; Error -> Error end). @@ -249,8 +254,9 @@ group_info(DbName, DDocId, DbOptions) -> reset_validation_funs(DbName) -> case get_or_create_db(DbName, []) of - {ok, #db{main_pid = Pid}} -> - gen_server:cast(Pid, {load_validation_funs, undefined}); + {ok, Db} -> + DbPid = couch_db:get_pid(Db), + gen_server:cast(DbPid, {load_validation_funs, undefined}); _ -> ok end. @@ -362,7 +368,7 @@ changes_enumerator(DocInfo, Acc) -> Opts = if Conflicts -> [conflicts | DocOptions]; true -> DocOptions end, ChangesRow = {change, [ {pending, Pending-1}, - {seq, {Seq, uuid(Db), owner_of(Seq, Epochs)}}, + {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}}, {id, Id}, {changes, Results}, {deleted, Del} | @@ -460,79 +466,20 @@ set_io_priority(DbName, Options) -> ok end. -calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) -> - Seq; -calculate_start_seq(Db, Node, {Seq, Uuid}) -> - % Treat the current node as the epoch node - calculate_start_seq(Db, Node, {Seq, Uuid, Node}); -calculate_start_seq(Db, _Node, {Seq, Uuid, EpochNode}) -> - case is_prefix(Uuid, couch_db:get_uuid(Db)) of - true -> - case is_owner(EpochNode, Seq, couch_db:get_epochs(Db)) of - true -> Seq; - false -> 0 - end; - false -> - %% The file was rebuilt, most likely in a different - %% order, so rewind. - 0 - end; -calculate_start_seq(Db, _Node, {replace, OriginalNode, Uuid, Seq}) -> - case is_prefix(Uuid, couch_db:get_uuid(Db)) of - true -> - start_seq(get_epochs(Db), OriginalNode, Seq); - false -> + +calculate_start_seq(Db, Node, Seq) -> + case couch_db:calculate_start_seq(Db, Node, Seq) of + N when is_integer(N) -> + N; + {replace, OriginalNode, Uuid, OriginalSeq} -> %% Scan history looking for an entry with %% * target_node == TargetNode %% * target_uuid == TargetUUID %% * target_seq =< TargetSeq %% If such an entry is found, stream from associated source_seq - mem3_rep:find_source_seq(Db, OriginalNode, Uuid, Seq) + mem3_rep:find_source_seq(Db, OriginalNode, Uuid, OriginalSeq) end. -is_prefix(Pattern, Subject) -> - binary:longest_common_prefix([Pattern, Subject]) == size(Pattern). - -is_owner(Node, Seq, Epochs) -> - validate_epochs(Epochs), - Node =:= owner_of(Seq, Epochs). - -owner_of(_Seq, []) -> - undefined; -owner_of(Seq, [{EpochNode, EpochSeq} | _Rest]) when Seq > EpochSeq -> - EpochNode; -owner_of(Seq, [_ | Rest]) -> - owner_of(Seq, Rest). - -get_epochs(Db) -> - Epochs = couch_db:get_epochs(Db), - validate_epochs(Epochs), - Epochs. - -start_seq([{OrigNode, EpochSeq} | _], OrigNode, Seq) when Seq > EpochSeq -> - %% OrigNode is the owner of the Seq so we can safely stream from there - Seq; -start_seq([{_, NewSeq}, {OrigNode, _} | _], OrigNode, Seq) when Seq > NewSeq -> - %% We transferred this file before Seq was written on OrigNode, so we need - %% to stream from the beginning of the next epoch. Note that it is _not_ - %% necessary for the current node to own the epoch beginning at NewSeq - NewSeq; -start_seq([_ | Rest], OrigNode, Seq) -> - start_seq(Rest, OrigNode, Seq); -start_seq([], OrigNode, Seq) -> - erlang:error({epoch_mismatch, OrigNode, Seq}). - -validate_epochs(Epochs) -> - %% Assert uniqueness. - case length(Epochs) == length(lists:ukeysort(2, Epochs)) of - true -> ok; - false -> erlang:error(duplicate_epoch) - end, - %% Assert order. - case Epochs == lists:sort(fun({_, A}, {_, B}) -> B =< A end, Epochs) of - true -> ok; - false -> erlang:error(epoch_order) - end. uuid(Db) -> Uuid = couch_db:get_uuid(Db), @@ -544,30 +491,6 @@ uuid_prefix_len() -> -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -calculate_start_seq_test() -> - %% uuid mismatch is always a rewind. - Hdr1 = couch_db_header:new(), - Hdr2 = couch_db_header:set(Hdr1, [{epochs, [{node1, 1}]}, {uuid, <<"uuid1">>}]), - ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, node1, {1, <<"uuid2">>})), - %% uuid matches and seq is owned by node. - Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]), - ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, node1, {2, <<"uuid1">>})), - %% uuids match but seq is not owned by node. - Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]), - ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, node1, {3, <<"uuid1">>})), - %% return integer if we didn't get a vector. - ?assertEqual(4, calculate_start_seq(#db{}, foo, 4)). - -is_owner_test() -> - ?assertNot(is_owner(foo, 1, [])), - ?assertNot(is_owner(foo, 1, [{foo, 1}])), - ?assert(is_owner(foo, 2, [{foo, 1}])), - ?assert(is_owner(foo, 50, [{bar, 100}, {foo, 1}])), - ?assert(is_owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])), - ?assert(is_owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])), - ?assertError(duplicate_epoch, is_owner(foo, 1, [{foo, 1}, {bar, 1}])), - ?assertError(epoch_order, is_owner(foo, 1, [{foo, 100}, {bar, 200}])). - maybe_filtered_json_doc_no_filter_test() -> Body = {[{<<"a">>, 1}]}, Doc = #doc{id = <<"1">>, revs = {1, [<<"r1">>]}, body = Body}, diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index 765561381..bf3f023db 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -305,7 +305,8 @@ path_ends_with(Path, Suffix) -> fake_db(DbName, Opts) -> {SecProps} = fabric:get_security(DbName), % as admin UserCtx = couch_util:get_value(user_ctx, Opts, #user_ctx{}), - #db{name = DbName, security = SecProps, user_ctx = UserCtx}. + {ok, Db} = couch_db:clustered_db(DbName, UserCtx, SecProps), + Db. %% test function kv(Item, Count) -> diff --git a/src/mango/src/mango_crud.erl b/src/mango/src/mango_crud.erl index 68c9d6cc4..41a4d143d 100644 --- a/src/mango/src/mango_crud.erl +++ b/src/mango/src/mango_crud.erl @@ -111,7 +111,7 @@ maybe_add_user_ctx(Db, Opts) -> {user_ctx, _} -> Opts; false -> - [{user_ctx, Db#db.user_ctx} | Opts] + [{user_ctx, couch_db:get_user_ctx(Db)} | Opts] end. diff --git a/src/mango/src/mango_cursor.erl b/src/mango/src/mango_cursor.erl index f36febdfc..e0792b737 100644 --- a/src/mango/src/mango_cursor.erl +++ b/src/mango/src/mango_cursor.erl @@ -90,9 +90,9 @@ execute(#cursor{index=Idx}=Cursor, UserFun, UserAcc) -> maybe_filter_indexes_by_ddoc(Indexes, Opts) -> case lists:keyfind(use_index, 1, Opts) of {use_index, []} -> - %We remove any indexes that have a selector + % We remove any indexes that have a selector % since they are only used when specified via use_index - remove_indexes_with_selector(Indexes); + remove_indexes_with_partial_filter_selector(Indexes); {use_index, [DesignId]} -> filter_indexes(Indexes, DesignId); {use_index, [DesignId, ViewName]} -> @@ -117,9 +117,9 @@ filter_indexes(Indexes0, DesignId, ViewName) -> lists:filter(FiltFun, Indexes). -remove_indexes_with_selector(Indexes) -> +remove_indexes_with_partial_filter_selector(Indexes) -> FiltFun = fun(Idx) -> - case mango_idx:get_idx_selector(Idx) of + case mango_idx:get_partial_filter_selector(Idx) of undefined -> true; _ -> false end diff --git a/src/mango/src/mango_cursor_text.erl b/src/mango/src/mango_cursor_text.erl index 70c911ac1..88abfc00a 100644 --- a/src/mango/src/mango_cursor_text.erl +++ b/src/mango/src/mango_cursor_text.erl @@ -51,7 +51,7 @@ create(Db, Indexes, Selector, Opts0) -> ?MANGO_ERROR(multiple_text_indexes) end, - Opts = unpack_bookmark(Db#db.name, Opts0), + Opts = unpack_bookmark(couch_db:name(Db), Opts0), DreyfusLimit = get_dreyfus_limit(), Limit = erlang:min(DreyfusLimit, couch_util:get_value(limit, Opts, mango_opts:default_limit())), @@ -98,7 +98,7 @@ execute(Cursor, UserFun, UserAcc) -> }, CAcc = #cacc{ selector = Selector, - dbname = Db#db.name, + dbname = couch_db:name(Db), ddocid = ddocid(Idx), idx_name = mango_idx:name(Idx), bookmark = get_bookmark(Opts), diff --git a/src/mango/src/mango_httpd.erl b/src/mango/src/mango_httpd.erl index d3ebf48c9..3ed51e2f0 100644 --- a/src/mango/src/mango_httpd.erl +++ b/src/mango/src/mango_httpd.erl @@ -192,7 +192,8 @@ handle_find_req(Req, _Db) -> set_user_ctx(#httpd{user_ctx=Ctx}, Db) -> - Db#db{user_ctx=Ctx}. + {ok, NewDb} = couch_db:set_user_ctx(Db, Ctx), + NewDb. get_idx_w_opts(Opts) -> diff --git a/src/mango/src/mango_idx.erl b/src/mango/src/mango_idx.erl index b8122517d..c5f870d5b 100644 --- a/src/mango/src/mango_idx.erl +++ b/src/mango/src/mango_idx.erl @@ -44,7 +44,7 @@ to_json/1, delete/4, get_usable_indexes/3, - get_idx_selector/1 + get_partial_filter_selector/1 ]). @@ -291,12 +291,12 @@ idx_mod(#idx{type = <<"text">>}) -> end. -db_to_name(#db{name=Name}) -> - Name; db_to_name(Name) when is_binary(Name) -> Name; db_to_name(Name) when is_list(Name) -> - iolist_to_binary(Name). + iolist_to_binary(Name); +db_to_name(Db) -> + couch_db:name(Db). get_idx_def(Opts) -> @@ -368,13 +368,66 @@ filter_opts([Opt | Rest]) -> [Opt | filter_opts(Rest)]. -get_idx_selector(#idx{def = Def}) when Def =:= all_docs; Def =:= undefined -> +get_partial_filter_selector(#idx{def = Def}) when Def =:= all_docs; Def =:= undefined -> undefined; -get_idx_selector(#idx{def = {Def}}) -> +get_partial_filter_selector(#idx{def = {Def}}) -> + case proplists:get_value(<<"partial_filter_selector">>, Def) of + undefined -> get_legacy_selector(Def); + {[]} -> undefined; + Selector -> Selector + end. + + +% Partial filter selectors is supported in text indexes via the selector field +% This adds backwards support for existing indexes that might have a selector in it +get_legacy_selector(Def) -> case proplists:get_value(<<"selector">>, Def) of undefined -> undefined; {[]} -> undefined; Selector -> Selector end. - +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +index(SelectorName, Selector) -> + { + idx,<<"mango_test_46418cd02081470d93290dc12306ebcb">>, + <<"_design/57e860dee471f40a2c74ea5b72997b81dda36a24">>, + <<"Selected">>,<<"json">>, + {[{<<"fields">>,{[{<<"location">>,<<"asc">>}]}}, + {SelectorName,{Selector}}]}, + [{<<"def">>,{[{<<"fields">>,[<<"location">>]}]}}] + }. + +get_partial_filter_all_docs_test() -> + Idx = #idx{def = all_docs}, + ?assertEqual(undefined, get_partial_filter_selector(Idx)). + +get_partial_filter_undefined_def_test() -> + Idx = #idx{def = undefined}, + ?assertEqual(undefined, get_partial_filter_selector(Idx)). + +get_partial_filter_selector_default_test() -> + Idx = index(<<"partial_filter_selector">>, []), + ?assertEqual(undefined, get_partial_filter_selector(Idx)). + +get_partial_filter_selector_missing_test() -> + Idx = index(<<"partial_filter_selector">>, []), + ?assertEqual(undefined, get_partial_filter_selector(Idx)). + +get_partial_filter_selector_with_selector_test() -> + Selector = [{<<"location">>,{[{<<"$gt">>,<<"FRA">>}]}}], + Idx = index(<<"partial_filter_selector">>, Selector), + ?assertEqual({Selector}, get_partial_filter_selector(Idx)). + +get_partial_filter_selector_with_legacy_selector_test() -> + Selector = [{<<"location">>,{[{<<"$gt">>,<<"FRA">>}]}}], + Idx = index(<<"selector">>, Selector), + ?assertEqual({Selector}, get_partial_filter_selector(Idx)). + +get_partial_filter_selector_with_legacy_default_selector_test() -> + Idx = index(<<"selector">>, []), + ?assertEqual(undefined, get_partial_filter_selector(Idx)). + +-endif. diff --git a/src/mango/src/mango_idx_text.erl b/src/mango/src/mango_idx_text.erl index f90ac7fac..e00c241d2 100644 --- a/src/mango/src/mango_idx_text.erl +++ b/src/mango/src/mango_idx_text.erl @@ -223,7 +223,13 @@ opts() -> {optional, true}, {default, {[]}} ]}, - {<<"selector">>, [ + {<<"partial_filter_selector">>, [ + {tag, partial_filter_selector}, + {optional, true}, + {default, {[]}}, + {validator, fun mango_opts:validate_selector/1} + ]}, + {<<"selector">>, [ {tag, selector}, {optional, true}, {default, {[]}}, @@ -344,8 +350,9 @@ indexable_fields(Fields, {op_default, _}) -> [<<"$default">> | Fields]. -maybe_reject_index_all_req({Def}, #db{name=DbName, user_ctx=Ctx}) -> - User = Ctx#user_ctx.name, +maybe_reject_index_all_req({Def}, Db) -> + DbName = couch_db:name(Db), + #user_ctx{name = User} = couch_db:get_user_ctx(Db), Fields = couch_util:get_value(fields, Def), case {Fields, forbid_index_all()} of {all_fields, "true"} -> @@ -374,7 +381,9 @@ setup() -> end), %default index all def that generates {fields, all_fields} Index = #idx{def={[]}}, - Db = #db{name = <<"testdb">>, user_ctx=#user_ctx{name = <<"u1">>}}, + DbName = <<"testdb">>, + UserCtx = #user_ctx{name = <<"u1">>}, + {ok, Db} = couch_db:clustered_db(DbName, UserCtx), {Index, Db, Ctx}. diff --git a/src/mango/src/mango_idx_view.erl b/src/mango/src/mango_idx_view.erl index d5dcd0c07..4cb039c4a 100644 --- a/src/mango/src/mango_idx_view.erl +++ b/src/mango/src/mango_idx_view.erl @@ -114,11 +114,12 @@ columns(Idx) -> is_usable(Idx, Selector) -> - % This index is usable if at least the first column is - % a member of the indexable fields of the selector. - Columns = columns(Idx), - Fields = indexable_fields(Selector), - lists:member(hd(Columns), Fields) andalso not is_text_search(Selector). + % This index is usable if all of the columns are + % restricted by the selector such that they are required to exist + % and the selector is not a text search (so requires a text index) + RequiredFields = columns(Idx), + mango_selector:has_required_fields(Selector, RequiredFields) + andalso not is_text_search(Selector). is_text_search({[]}) -> @@ -198,8 +199,8 @@ opts() -> {tag, fields}, {validator, fun mango_opts:validate_sort/1} ]}, - {<<"selector">>, [ - {tag, selector}, + {<<"partial_filter_selector">>, [ + {tag, partial_filter_selector}, {optional, true}, {default, {[]}}, {validator, fun mango_opts:validate_selector/1} diff --git a/src/mango/src/mango_native_proc.erl b/src/mango/src/mango_native_proc.erl index 82081a976..61d79b7ec 100644 --- a/src/mango/src/mango_native_proc.erl +++ b/src/mango/src/mango_native_proc.erl @@ -135,7 +135,7 @@ index_doc(#st{indexes=Indexes}, Doc) -> get_index_entries({IdxProps}, Doc) -> {Fields} = couch_util:get_value(<<"fields">>, IdxProps), - Selector = get_index_selector(IdxProps), + Selector = get_index_partial_filter_selector(IdxProps), case should_index(Selector, Doc) of false -> []; @@ -159,7 +159,7 @@ get_index_values(Fields, Doc) -> get_text_entries({IdxProps}, Doc) -> - Selector = get_index_selector(IdxProps), + Selector = get_index_partial_filter_selector(IdxProps), case should_index(Selector, Doc) of true -> get_text_entries0(IdxProps, Doc); @@ -168,10 +168,21 @@ get_text_entries({IdxProps}, Doc) -> end. -get_index_selector(IdxProps) -> - case couch_util:get_value(<<"selector">>, IdxProps) of - [] -> {[]}; - Else -> Else +get_index_partial_filter_selector(IdxProps) -> + case couch_util:get_value(<<"partial_filter_selector">>, IdxProps) of + undefined -> + % this is to support legacy text indexes that had the partial_filter_selector + % set as selector + case couch_util:get_value(<<"selector">>, IdxProps, []) of + [] -> + {[]}; + Else -> + Else + end; + [] -> + {[]}; + Else -> + Else end. diff --git a/src/mango/src/mango_selector.erl b/src/mango/src/mango_selector.erl index bcf347201..fe3998683 100644 --- a/src/mango/src/mango_selector.erl +++ b/src/mango/src/mango_selector.erl @@ -15,7 +15,8 @@ -export([ normalize/1, - match/2 + match/2, + has_required_fields/2 ]). @@ -566,3 +567,110 @@ match({[{Field, Cond}]}, Value, Cmp) -> match({[_, _ | _] = _Props} = Sel, _Value, _Cmp) -> erlang:error({unnormalized_selector, Sel}). + + +% Returns true if Selector requires all +% fields in RequiredFields to exist in any matching documents. + +% For each condition in the selector, check +% whether the field is in RequiredFields. +% If it is, remove it from RequiredFields and continue +% until we match then all or run out of selector to +% match against. + +% Empty selector +has_required_fields({[]}, _) -> + false; + +% No more required fields +has_required_fields(_, []) -> + true; + +% No more selector +has_required_fields([], _) -> + false; + +has_required_fields(Selector, RequiredFields) when not is_list(Selector) -> + has_required_fields([Selector], RequiredFields); + +% We can "see" through $and operator. We ignore other +% combination operators because they can't be used to restrict +% an index. +has_required_fields([{[{<<"$and">>, Args}]}], RequiredFields) + when is_list(Args) -> + has_required_fields(Args, RequiredFields); + +has_required_fields([{[{Field, Cond}]} | Rest], RequiredFields) -> + case Cond of + % $exists:false is a special case - this is the only operator + % that explicitly does not require a field to exist + {[{<<"$exists">>, false}]} -> + has_required_fields(Rest, RequiredFields); + _ -> + has_required_fields(Rest, lists:delete(Field, RequiredFields)) + end. + + +%%%%%%%% module tests below %%%%%%%% + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +has_required_fields_basic_test() -> + RequiredFields = [<<"A">>], + Selector = {[{<<"A">>, <<"foo">>}]}, + Normalized = normalize(Selector), + ?assertEqual(true, has_required_fields(Normalized, RequiredFields)). + +has_required_fields_basic_failure_test() -> + RequiredFields = [<<"B">>], + Selector = {[{<<"A">>, <<"foo">>}]}, + Normalized = normalize(Selector), + ?assertEqual(false, has_required_fields(Normalized, RequiredFields)). + +has_required_fields_empty_selector_test() -> + RequiredFields = [<<"A">>], + Selector = {[]}, + Normalized = normalize(Selector), + ?assertEqual(false, has_required_fields(Normalized, RequiredFields)). + +has_required_fields_exists_false_test() -> + RequiredFields = [<<"A">>], + Selector = {[{<<"A">>,{[{<<"$exists">>, false}]}}]}, + Normalized = normalize(Selector), + ?assertEqual(false, has_required_fields(Normalized, RequiredFields)). + +has_required_fields_and_true_test() -> + RequiredFields = [<<"A">>], + Selector = {[{<<"$and">>, + [ + {[{<<"A">>, <<"foo">>}]}, + {[{<<"B">>, <<"foo">>}]} + ] + }]}, + Normalized = normalize(Selector), + ?assertEqual(true, has_required_fields(Normalized, RequiredFields)). + +has_required_fields_and_false_test() -> + RequiredFields = [<<"A">>, <<"C">>], + Selector = {[{<<"$and">>, + [ + {[{<<"A">>, <<"foo">>}]}, + {[{<<"B">>, <<"foo">>}]} + ] + }]}, + Normalized = normalize(Selector), + ?assertEqual(false, has_required_fields(Normalized, RequiredFields)). + +has_required_fields_or_test() -> + RequiredFields = [<<"A">>], + Selector = {[{<<"$or">>, + [ + {[{<<"A">>, <<"foo">>}]}, + {[{<<"B">>, <<"foo">>}]} + ] + }]}, + Normalized = normalize(Selector), + ?assertEqual(false, has_required_fields(Normalized, RequiredFields)). + +-endif.
\ No newline at end of file diff --git a/src/mango/test/01-index-crud-test.py b/src/mango/test/01-index-crud-test.py index 6582020f5..617bfd523 100644 --- a/src/mango/test/01-index-crud-test.py +++ b/src/mango/test/01-index-crud-test.py @@ -35,7 +35,7 @@ class IndexCrudTests(mango.DbPerClass): for fields in bad_fields: try: self.db.create_index(fields) - except Exception, e: + except Exception as e: assert e.response.status_code == 400 else: raise AssertionError("bad create index") @@ -54,7 +54,7 @@ class IndexCrudTests(mango.DbPerClass): for bt in bad_types: try: self.db.create_index(["foo"], idx_type=bt) - except Exception, e: + except Exception as e: assert e.response.status_code == 400, (bt, e.response.status_code) else: raise AssertionError("bad create index") @@ -70,13 +70,13 @@ class IndexCrudTests(mango.DbPerClass): for bn in bad_names: try: self.db.create_index(["foo"], name=bn) - except Exception, e: + except Exception as e: assert e.response.status_code == 400 else: raise AssertionError("bad create index") try: self.db.create_index(["foo"], ddoc=bn) - except Exception, e: + except Exception as e: assert e.response.status_code == 400 else: raise AssertionError("bad create index") @@ -207,7 +207,7 @@ class IndexCrudTests(mango.DbPerClass): # Missing design doc try: self.db.delete_index("this_is_not_a_design_doc_id", "foo") - except Exception, e: + except Exception as e: assert e.response.status_code == 404 else: raise AssertionError("bad index delete") @@ -220,7 +220,7 @@ class IndexCrudTests(mango.DbPerClass): ddocid = idx["ddoc"].split("/")[-1] try: self.db.delete_index(ddocid, "this_is_not_an_index_name") - except Exception, e: + except Exception as e: assert e.response.status_code == 404 else: raise AssertionError("bad index delete") @@ -228,7 +228,7 @@ class IndexCrudTests(mango.DbPerClass): # Bad view type try: self.db.delete_index(ddocid, idx["name"], idx_type="not_a_real_type") - except Exception, e: + except Exception as e: assert e.response.status_code == 404 else: raise AssertionError("bad index delete") @@ -244,7 +244,6 @@ class IndexCrudTests(mango.DbPerClass): for idx in self.db.list_indexes(): if idx["name"] != "text_idx_01": continue - print idx["def"] assert idx["def"]["fields"] == [ {"stringidx": "string"}, {"booleanidx": "boolean"} @@ -270,7 +269,7 @@ class IndexCrudTests(mango.DbPerClass): for fields in bad_fields: try: self.db.create_text_index(fields=fields) - except Exception, e: + except Exception as e: assert e.response.status_code == 400 else: raise AssertionError("bad create text index") @@ -310,10 +309,10 @@ class IndexCrudTests(mango.DbPerClass): try: self.db.list_indexes(skip=-1) - except Exception, e: + except Exception as e: assert e.response.status_code == 500 try: self.db.list_indexes(limit=0) - except Exception, e: + except Exception as e: assert e.response.status_code == 500 diff --git a/src/mango/test/02-basic-find-test.py b/src/mango/test/02-basic-find-test.py index 699166e28..a8725ffa8 100644 --- a/src/mango/test/02-basic-find-test.py +++ b/src/mango/test/02-basic-find-test.py @@ -30,7 +30,7 @@ class BasicFindTests(mango.UserDocsTests): for bs in bad_selectors: try: self.db.find(bs) - except Exception, e: + except Exception as e: assert e.response.status_code == 400 else: raise AssertionError("bad find") @@ -49,7 +49,7 @@ class BasicFindTests(mango.UserDocsTests): for bl in bad_limits: try: self.db.find({"int":{"$gt":2}}, limit=bl) - except Exception, e: + except Exception as e: assert e.response.status_code == 400 else: raise AssertionError("bad find") @@ -68,7 +68,7 @@ class BasicFindTests(mango.UserDocsTests): for bs in bad_skips: try: self.db.find({"int":{"$gt":2}}, skip=bs) - except Exception, e: + except Exception as e: assert e.response.status_code == 400 else: raise AssertionError("bad find") @@ -88,7 +88,7 @@ class BasicFindTests(mango.UserDocsTests): for bs in bad_sorts: try: self.db.find({"int":{"$gt":2}}, sort=bs) - except Exception, e: + except Exception as e: assert e.response.status_code == 400 else: raise AssertionError("bad find") @@ -108,7 +108,7 @@ class BasicFindTests(mango.UserDocsTests): for bf in bad_fields: try: self.db.find({"int":{"$gt":2}}, fields=bf) - except Exception, e: + except Exception as e: assert e.response.status_code == 400 else: raise AssertionError("bad find") @@ -126,7 +126,7 @@ class BasicFindTests(mango.UserDocsTests): for br in bad_rs: try: self.db.find({"int":{"$gt":2}}, r=br) - except Exception, e: + except Exception as e: assert e.response.status_code == 400 else: raise AssertionError("bad find") @@ -142,7 +142,7 @@ class BasicFindTests(mango.UserDocsTests): for bc in bad_conflicts: try: self.db.find({"int":{"$gt":2}}, conflicts=bc) - except Exception, e: + except Exception as e: assert e.response.status_code == 400 else: raise AssertionError("bad find") diff --git a/src/mango/test/03-operator-test.py b/src/mango/test/03-operator-test.py index 863752682..1af39f205 100644 --- a/src/mango/test/03-operator-test.py +++ b/src/mango/test/03-operator-test.py @@ -26,8 +26,8 @@ class OperatorTests: "manager": True, "favorites": {"$all": ["Lisp", "Python"]} }) - self.assertEqual(len(docs), 4) - user_ids = [2,12,9,14] + self.assertEqual(len(docs), 3) + user_ids = [2,12,9] self.assertUserIds(user_ids, docs) def test_all_non_array(self): @@ -124,7 +124,7 @@ class OperatorTests: "manager": True, "favorites": {"$in": ["Ruby", "Python"]} }) - self.assertUserIds([2,6,7,9,11,12,14], docs) + self.assertUserIds([2,6,7,9,11,12], docs) def test_nin_operator_array(self): docs = self.db.find({ diff --git a/src/mango/test/05-index-selection-test.py b/src/mango/test/05-index-selection-test.py index 2fb0a405b..1cc210382 100644 --- a/src/mango/test/05-index-selection-test.py +++ b/src/mango/test/05-index-selection-test.py @@ -23,7 +23,7 @@ class IndexSelectionTests(mango.UserDocsTests): user_docs.add_text_indexes(klass.db, {}) def test_basic(self): - resp = self.db.find({"name.last": "A last name"}, explain=True) + resp = self.db.find({"age": 123}, explain=True) self.assertEqual(resp["index"]["type"], "json") def test_with_and(self): @@ -77,7 +77,7 @@ class IndexSelectionTests(mango.UserDocsTests): def test_no_valid_sort_index(self): try: self.db.find({"_id": {"$gt": None}}, sort=["name"], return_raw=True) - except Exception, e: + except Exception as e: self.assertEqual(e.response.status_code, 400) else: raise AssertionError("bad find") @@ -87,11 +87,83 @@ class IndexSelectionTests(mango.UserDocsTests): ddocid = "_design/ad3d537c03cd7c6a43cf8dff66ef70ea54c2b40f" try: self.db.find({}, use_index=ddocid) - except Exception, e: + except Exception as e: self.assertEqual(e.response.status_code, 400) else: raise AssertionError("bad find") + def test_uses_all_docs_when_fields_do_not_match_selector(self): + # index exists on ["company", "manager"] but not ["company"] + # so we should fall back to all docs (so we include docs + # with no "manager" field) + selector = { + "company": "Pharmex" + } + docs = self.db.find(selector) + self.assertEqual(len(docs), 1) + self.assertEqual(docs[0]["company"], "Pharmex") + self.assertNotIn("manager", docs[0]) + + resp_explain = self.db.find(selector, explain=True) + self.assertEqual(resp_explain["index"]["type"], "special") + + def test_uses_all_docs_when_selector_doesnt_require_fields_to_exist(self): + # as in test above, use a selector that doesn't overlap with the index + # due to an explicit exists clause + selector = { + "company": "Pharmex", + "manager": {"$exists": False} + } + docs = self.db.find(selector) + self.assertEqual(len(docs), 1) + self.assertEqual(docs[0]["company"], "Pharmex") + self.assertNotIn("manager", docs[0]) + + resp_explain = self.db.find(selector, explain=True) + self.assertEqual(resp_explain["index"]["type"], "special") + + def test_uses_index_when_no_range_or_equals(self): + # index on ["manager"] should be valid because + # selector requires "manager" to exist. The + # selector doesn't narrow the keyrange so it's + # a full index scan + selector = { + "manager": {"$exists": True} + } + docs = self.db.find(selector) + self.assertEqual(len(docs), 14) + + resp_explain = self.db.find(selector, explain=True) + self.assertEqual(resp_explain["index"]["type"], "json") + + + def test_reject_use_index_invalid_fields(self): + # index on ["company","manager"] which should not be valid + ddocid = "_design/a0c425a60cf3c3c09e3c537c9ef20059dcef9198" + selector = { + "company": "Pharmex" + } + try: + self.db.find(selector, use_index=ddocid) + except Exception as e: + self.assertEqual(e.response.status_code, 400) + else: + raise AssertionError("did not reject bad use_index") + + def test_reject_use_index_sort_order(self): + # index on ["company","manager"] which should not be valid + ddocid = "_design/a0c425a60cf3c3c09e3c537c9ef20059dcef9198" + selector = { + "company": {"$gt": None}, + "manager": {"$gt": None} + } + try: + self.db.find(selector, use_index=ddocid, sort=[{"manager":"desc"}]) + except Exception as e: + self.assertEqual(e.response.status_code, 400) + else: + raise AssertionError("did not reject bad use_index") + # This doc will not be saved given the new ddoc validation code # in couch_mrview def test_manual_bad_view_idx01(self): @@ -178,7 +250,7 @@ class MultiTextIndexSelectionTests(mango.UserDocsTests): def test_multi_text_index_is_error(self): try: self.db.find({"$text": "a query"}, explain=True) - except Exception, e: + except Exception as e: self.assertEqual(e.response.status_code, 400) def test_use_index_works(self): diff --git a/src/mango/test/06-basic-text-test.py b/src/mango/test/06-basic-text-test.py index 7f5ce6345..c02950c46 100644 --- a/src/mango/test/06-basic-text-test.py +++ b/src/mango/test/06-basic-text-test.py @@ -64,7 +64,6 @@ class BasicTextTests(mango.UserDocsTextTests): # Nested Level docs = self.db.find({"favorites.0.2": "Python"}) - print len(docs) assert len(docs) == 1 for d in docs: assert "Python" in d["favorites"][0][2] @@ -451,14 +450,12 @@ class ElemMatchTests(mango.FriendDocsTextTests): } } docs = self.db.find(q) - print len(docs) assert len(docs) == 1 assert docs[0]["bestfriends"] == ["Wolverine", "Cyclops"] q = {"results": {"$elemMatch": {"$gte": 80, "$lt": 85}}} docs = self.db.find(q) - print len(docs) assert len(docs) == 1 assert docs[0]["results"] == [82, 85, 88] diff --git a/src/mango/test/07-text-custom-field-list-test.py b/src/mango/test/07-text-custom-field-list-test.py index 50a5c0522..a43e33003 100644 --- a/src/mango/test/07-text-custom-field-list-test.py +++ b/src/mango/test/07-text-custom-field-list-test.py @@ -56,7 +56,7 @@ class CustomFieldsTest(mango.UserDocsTextTests): try: self.db.find({"selector": {"$or": [{"favorites": "Ruby"}, {"favorites.0":"Ruby"}]}}) - except Exception, e: + except Exception as e: assert e.response.status_code == 400 def test_in_with_array(self): @@ -82,7 +82,7 @@ class CustomFieldsTest(mango.UserDocsTextTests): vals = ["Random Garbage", 52, {"Versions": {"Alpha": "Beta"}}] try: self.db.find({"favorites": {"$in": vals}}) - except Exception, e: + except Exception as e: assert e.response.status_code == 400 def test_nin_with_array(self): diff --git a/src/mango/test/08-text-limit-test.py b/src/mango/test/08-text-limit-test.py index 191a1108a..4bc87b4b9 100644 --- a/src/mango/test/08-text-limit-test.py +++ b/src/mango/test/08-text-limit-test.py @@ -47,7 +47,6 @@ class LimitTests(mango.LimitDocsTextTests): def test_limit_field5(self): q = {"age": {"$exists": True}} docs = self.db.find(q, limit=250) - print len(docs) assert len(docs) == 75 for d in docs: assert d["age"] < 100 @@ -78,7 +77,7 @@ class LimitTests(mango.LimitDocsTextTests): q = {"$or": [{"user_id" : {"$lt" : 100}}, {"filtered_array.[]": 1}]} try: self.db.find(q, limit=-1) - except Exception, e: + except Exception as e: assert e.response.status_code == 400 else: raise AssertionError("Should have thrown error for negative limit") @@ -87,7 +86,7 @@ class LimitTests(mango.LimitDocsTextTests): q = {"$or": [{"user_id" : {"$lt" : 100}}, {"filtered_array.[]": 1}]} try: self.db.find(q, skip=-1) - except Exception, e: + except Exception as e: assert e.response.status_code == 400 else: raise AssertionError("Should have thrown error for negative skip") @@ -102,7 +101,6 @@ class LimitTests(mango.LimitDocsTextTests): def run_bookmark_check(self, size): - print size q = {"age": {"$gt": 0}} seen_docs = set() bm = None diff --git a/src/mango/test/09-text-sort-test.py b/src/mango/test/09-text-sort-test.py index ae36a6a33..1c5557227 100644 --- a/src/mango/test/09-text-sort-test.py +++ b/src/mango/test/09-text-sort-test.py @@ -43,7 +43,7 @@ class SortTests(mango.UserDocsTextTests): q = {"email": {"$gt": None}} try: self.db.find(q, sort=["email"]) - except Exception, e: + except Exception as e: assert e.response.status_code == 400 else: raise AssertionError("Should have thrown error for sort") @@ -79,7 +79,7 @@ class SortTests(mango.UserDocsTextTests): {"age": "34"}]} try: self.db.find(q, sort=["age"]) - except Exception, e: + except Exception as e: assert e.response.status_code == 400 else: raise AssertionError("Should have thrown error for sort") diff --git a/src/mango/test/14-json-pagination.py b/src/mango/test/14-json-pagination.py index ddac15662..ea06e0a2a 100644 --- a/src/mango/test/14-json-pagination.py +++ b/src/mango/test/14-json-pagination.py @@ -159,7 +159,7 @@ class PaginateJsonDocs(mango.DbPerClass): def test_bad_bookmark(self): try: self.db.find({"_id": {"$gt": 0}}, bookmark="bad-bookmark") - except Exception, e: + except Exception as e: resp = e.response.json() assert resp["error"] == "invalid_bookmark" assert resp["reason"] == "Invalid bookmark value: \"bad-bookmark\"" @@ -171,7 +171,7 @@ class PaginateJsonDocs(mango.DbPerClass): bookmark = 'g2wAAAABaANkABFub2RlMUBjb3VjaGRiLm5ldGwAAAACYQBiP____2poAkY_8AAAAAAAAGEHag' try: self.db.find({"_id": {"$gt": 0}}, bookmark=bookmark) - except Exception, e: + except Exception as e: resp = e.response.json() assert resp["error"] == "invalid_bookmark" assert e.response.status_code == 400 diff --git a/src/mango/test/16-index-selectors.py b/src/mango/test/16-index-selectors.py index b18945609..3ce659ecf 100644 --- a/src/mango/test/16-index-selectors.py +++ b/src/mango/test/16-index-selectors.py @@ -73,20 +73,84 @@ DOCS = [ }, ] +oldschoolddoc = { + "_id": "_design/oldschool", + "language": "query", + "views": { + "oldschool": { + "map": { + "fields": { + "location": "asc" + }, + "selector": { + "location": {"$gte": "FRA"} + } + }, + "reduce": "_count", + "options": { + "def": { + "fields": [ + "location" + ] + } + } + } + } +} + +oldschoolddoctext = { + "_id": "_design/oldschooltext", + "language": "query", + "indexes": { + "oldschooltext": { + "index": { + "default_analyzer": "keyword", + "default_field": {}, + "selector": { + "location": {"$gte": "FRA"} + }, + "fields": [ + { + "name": "location", + "type": "string" + } + ], + "index_array_lengths": True + }, + "analyzer": { + "name": "perfield", + "default": "keyword", + "fields": { + "$default": "standard" + } + } + } + } +} + class IndexSelectorJson(mango.DbPerClass): def setUp(self): self.db.recreate() self.db.save_docs(copy.deepcopy(DOCS)) - def test_saves_selector_in_index(self): + def test_saves_partial_filter_selector_in_index(self): selector = {"location": {"$gte": "FRA"}} - self.db.create_index(["location"], selector=selector) + self.db.create_index(["location"], partial_filter_selector=selector) indexes = self.db.list_indexes() - self.assertEqual(indexes[1]["def"]["selector"], selector) + self.assertEqual(indexes[1]["def"]["partial_filter_selector"], selector) + + def test_saves_selector_in_index_throws(self): + selector = {"location": {"$gte": "FRA"}} + try: + self.db.create_index(["location"], selector=selector) + except Exception as e: + assert e.response.status_code == 400 + else: + raise AssertionError("bad index creation") def test_uses_partial_index_for_query_selector(self): selector = {"location": {"$gte": "FRA"}} - self.db.create_index(["location"], selector=selector, ddoc="Selected", name="Selected") + self.db.create_index(["location"], partial_filter_selector=selector, ddoc="Selected", name="Selected") resp = self.db.find(selector, explain=True, use_index='Selected') self.assertEqual(resp["index"]["name"], "Selected") docs = self.db.find(selector, use_index='Selected') @@ -95,7 +159,7 @@ class IndexSelectorJson(mango.DbPerClass): def test_uses_partial_index_with_different_selector(self): selector = {"location": {"$gte": "FRA"}} selector2 = {"location": {"$gte": "A"}} - self.db.create_index(["location"], selector=selector, ddoc="Selected", name="Selected") + self.db.create_index(["location"], partial_filter_selector=selector, ddoc="Selected", name="Selected") resp = self.db.find(selector2, explain=True, use_index='Selected') self.assertEqual(resp["index"]["name"], "Selected") docs = self.db.find(selector2, use_index='Selected') @@ -103,28 +167,36 @@ class IndexSelectorJson(mango.DbPerClass): def test_doesnot_use_selector_when_not_specified(self): selector = {"location": {"$gte": "FRA"}} - self.db.create_index(["location"], selector=selector, ddoc="Selected", name="Selected") + self.db.create_index(["location"], partial_filter_selector=selector, ddoc="Selected", name="Selected") resp = self.db.find(selector, explain=True) self.assertEqual(resp["index"]["name"], "_all_docs") def test_doesnot_use_selector_when_not_specified_with_index(self): selector = {"location": {"$gte": "FRA"}} - self.db.create_index(["location"], selector=selector, ddoc="Selected", name="Selected") + self.db.create_index(["location"], partial_filter_selector=selector, ddoc="Selected", name="Selected") self.db.create_index(["location"], name="NotSelected") resp = self.db.find(selector, explain=True) self.assertEqual(resp["index"]["name"], "NotSelected") + def test_old_selector_still_supported(self): + selector = {"location": {"$gte": "FRA"}} + self.db.save_doc(oldschoolddoc) + resp = self.db.find(selector, explain=True, use_index='oldschool') + self.assertEqual(resp["index"]["name"], "oldschool") + docs = self.db.find(selector, use_index='oldschool') + self.assertEqual(len(docs), 3) + @unittest.skipUnless(mango.has_text_service(), "requires text service") - def test_text_saves_selector_in_index(self): + def test_text_saves_partialfilterselector_in_index(self): selector = {"location": {"$gte": "FRA"}} - self.db.create_text_index(fields=[{"name":"location", "type":"string"}], selector=selector) + self.db.create_text_index(fields=[{"name":"location", "type":"string"}], partial_filter_selector=selector) indexes = self.db.list_indexes() - self.assertEqual(indexes[1]["def"]["selector"], selector) + self.assertEqual(indexes[1]["def"]["partial_filter_selector"], selector) @unittest.skipUnless(mango.has_text_service(), "requires text service") def test_text_uses_partial_index_for_query_selector(self): selector = {"location": {"$gte": "FRA"}} - self.db.create_text_index(fields=[{"name":"location", "type":"string"}], selector=selector, ddoc="Selected", name="Selected") + self.db.create_text_index(fields=[{"name":"location", "type":"string"}], partial_filter_selector=selector, ddoc="Selected", name="Selected") resp = self.db.find(selector, explain=True, use_index='Selected') self.assertEqual(resp["index"]["name"], "Selected") docs = self.db.find(selector, use_index='Selected', fields=['_id', 'location']) @@ -134,7 +206,7 @@ class IndexSelectorJson(mango.DbPerClass): def test_text_uses_partial_index_with_different_selector(self): selector = {"location": {"$gte": "FRA"}} selector2 = {"location": {"$gte": "A"}} - self.db.create_text_index(fields=[{"name":"location", "type":"string"}], selector=selector, ddoc="Selected", name="Selected") + self.db.create_text_index(fields=[{"name":"location", "type":"string"}], partial_filter_selector=selector, ddoc="Selected", name="Selected") resp = self.db.find(selector2, explain=True, use_index='Selected') self.assertEqual(resp["index"]["name"], "Selected") docs = self.db.find(selector2, use_index='Selected') @@ -143,14 +215,23 @@ class IndexSelectorJson(mango.DbPerClass): @unittest.skipUnless(mango.has_text_service(), "requires text service") def test_text_doesnot_use_selector_when_not_specified(self): selector = {"location": {"$gte": "FRA"}} - self.db.create_text_index(fields=[{"name":"location", "type":"string"}], selector=selector, ddoc="Selected", name="Selected") + self.db.create_text_index(fields=[{"name":"location", "type":"string"}], partial_filter_selector=selector, ddoc="Selected", name="Selected") resp = self.db.find(selector, explain=True) self.assertEqual(resp["index"]["name"], "_all_docs") @unittest.skipUnless(mango.has_text_service(), "requires text service") def test_text_doesnot_use_selector_when_not_specified_with_index(self): selector = {"location": {"$gte": "FRA"}} - self.db.create_text_index(fields=[{"name":"location", "type":"string"}], selector=selector, ddoc="Selected", name="Selected") + self.db.create_text_index(fields=[{"name":"location", "type":"string"}], partial_filter_selector=selector, ddoc="Selected", name="Selected") self.db.create_text_index(fields=[{"name":"location", "type":"string"}], name="NotSelected") resp = self.db.find(selector, explain=True) - self.assertEqual(resp["index"]["name"], "NotSelected")
\ No newline at end of file + self.assertEqual(resp["index"]["name"], "NotSelected") + + @unittest.skipUnless(mango.has_text_service(), "requires text service") + def test_text_old_selector_still_supported(self): + selector = {"location": {"$gte": "FRA"}} + self.db.save_doc(oldschoolddoctext) + resp = self.db.find(selector, explain=True, use_index='oldschooltext') + self.assertEqual(resp["index"]["name"], "oldschooltext") + docs = self.db.find(selector, use_index='oldschooltext') + self.assertEqual(len(docs), 3)
\ No newline at end of file diff --git a/src/mango/test/mango.py b/src/mango/test/mango.py index 2c8971485..a275a23d0 100644 --- a/src/mango/test/mango.py +++ b/src/mango/test/mango.py @@ -44,7 +44,7 @@ class Database(object): return "http://{}:{}/{}".format(self.host, self.port, self.dbname) def path(self, parts): - if isinstance(parts, (str, unicode)): + if isinstance(parts, ("".__class__, u"".__class__)): parts = [parts] return "/".join([self.url] + parts) @@ -84,7 +84,8 @@ class Database(object): r.raise_for_status() return r.json() - def create_index(self, fields, idx_type="json", name=None, ddoc=None, selector=None): + def create_index(self, fields, idx_type="json", name=None, ddoc=None, + partial_filter_selector=None, selector=None): body = { "index": { "fields": fields @@ -98,6 +99,8 @@ class Database(object): body["ddoc"] = ddoc if selector is not None: body["index"]["selector"] = selector + if partial_filter_selector is not None: + body["index"]["partial_filter_selector"] = partial_filter_selector body = json.dumps(body) r = self.sess.post(self.path("_index"), data=body) r.raise_for_status() @@ -105,8 +108,9 @@ class Database(object): assert r.json()["name"] is not None return r.json()["result"] == "created" - def create_text_index(self, analyzer=None, selector=None, idx_type="text", - default_field=None, fields=None, name=None, ddoc=None,index_array_lengths=None): + def create_text_index(self, analyzer=None, idx_type="text", + partial_filter_selector=None, default_field=None, fields=None, + name=None, ddoc=None,index_array_lengths=None): body = { "index": { }, @@ -121,8 +125,8 @@ class Database(object): body["index"]["default_field"] = default_field if index_array_lengths is not None: body["index"]["index_array_lengths"] = index_array_lengths - if selector is not None: - body["index"]["selector"] = selector + if partial_filter_selector is not None: + body["index"]["partial_filter_selector"] = partial_filter_selector if fields is not None: body["index"]["fields"] = fields if ddoc is not None: diff --git a/src/mango/test/user_docs.py b/src/mango/test/user_docs.py index 9896e5596..02ffe9ffc 100644 --- a/src/mango/test/user_docs.py +++ b/src/mango/test/user_docs.py @@ -493,7 +493,6 @@ DOCS = [ }, "company": "Pharmex", "email": "faithhess@pharmex.com", - "manager": True, "favorites": [ "Erlang", "Python", diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl index 41278025b..e2cbb2ec6 100644 --- a/src/mem3/src/mem3.erl +++ b/src/mem3/src/mem3.erl @@ -145,13 +145,13 @@ get_shard(DbName, Node, Range) -> local_shards(DbName) -> mem3_shards:local(DbName). -shard_suffix(#db{name=DbName}) -> - shard_suffix(DbName); -shard_suffix(DbName0) -> +shard_suffix(DbName0) when is_binary(DbName0) -> Shard = hd(shards(DbName0)), <<"shards/", _:8/binary, "-", _:8/binary, "/", DbName/binary>> = Shard#shard.name, - filename:extension(binary_to_list(DbName)). + filename:extension(binary_to_list(DbName)); +shard_suffix(Db) -> + shard_suffix(couch_db:name(Db)). fold_shards(Fun, Acc) -> mem3_shards:fold(Fun, Acc). @@ -295,10 +295,11 @@ group_by_range(Shards) -> % quorum functions -quorum(#db{name=DbName}) -> - quorum(DbName); -quorum(DbName) -> - n(DbName) div 2 + 1. +quorum(DbName) when is_binary(DbName) -> + n(DbName) div 2 + 1; +quorum(Db) -> + quorum(couch_db:name(Db)). + node(#shard{node=Node}) -> Node; diff --git a/src/mem3/src/mem3_cluster.erl b/src/mem3/src/mem3_cluster.erl new file mode 100644 index 000000000..7e3d477cb --- /dev/null +++ b/src/mem3/src/mem3_cluster.erl @@ -0,0 +1,161 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +% Maintain cluster stability information. A cluster is considered stable if there +% were no changes to during a given period of time. +% +% To be notified of cluster stability / instability the owner module must +% implement the mem3_cluster behavior. When cluster membership changes, +% cluster_unstable behavior callback will be called. After that is are no more +% changes to the cluster, then cluster_stable callback will be called. +% +% The period is passed in as start argument but it can also be set dynamically +% via the set_period/2 API call. +% +% In some cases it might be useful to have a shorter pariod during startup. +% That can be configured via the StartPeriod argument. If the time since start +% is less than a full period, then the StartPeriod is used as the period. + + +-module(mem3_cluster). + +-behaviour(gen_server). + +-export([ + start_link/4, + set_period/2 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + + +-callback cluster_stable(Context :: term()) -> NewContext :: term(). +-callback cluster_unstable(Context :: term()) -> NewContext :: term(). + + +-record(state, { + mod :: atom(), + ctx :: term(), + start_time :: erlang:timestamp(), + last_change :: erlang:timestamp(), + period :: integer(), + start_period :: integer(), + timer :: reference() +}). + + +-spec start_link(module(), term(), integer(), integer()) -> + {ok, pid()} | ignore | {error, term()}. +start_link(Module, Context, StartPeriod, Period) + when is_atom(Module), is_integer(StartPeriod), is_integer(Period) -> + gen_server:start_link(?MODULE, [Module, Context, StartPeriod, Period], []). + + +-spec set_period(pid(), integer()) -> ok. +set_period(Server, Period) when is_pid(Server), is_integer(Period) -> + gen_server:cast(Server, {set_period, Period}). + + +% gen_server callbacks + +init([Module, Context, StartPeriod, Period]) -> + net_kernel:monitor_nodes(true), + {ok, #state{ + mod = Module, + ctx = Context, + start_time = os:timestamp(), + last_change = os:timestamp(), + period = Period, + start_period = StartPeriod, + timer = new_timer(StartPeriod) + }}. + + +terminate(_Reason, _State) -> + ok. + +handle_call(_Msg, _From, State) -> + {reply, ignored, State}. + + +handle_cast({set_period, Period}, State) -> + {noreply, State#state{period = Period}}. + + +handle_info({nodeup, _Node}, State) -> + {noreply, cluster_changed(State)}; + +handle_info({nodedown, _Node}, State) -> + {noreply, cluster_changed(State)}; + +handle_info(stability_check, #state{mod = Mod, ctx = Ctx} = State) -> + erlang:cancel_timer(State#state.timer), + case now_diff_sec(State#state.last_change) > interval(State) of + true -> + {noreply, State#state{ctx = Mod:cluster_stable(Ctx)}}; + false -> + Timer = new_timer(interval(State)), + {noreply, State#state{timer = Timer}} + end. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%% Internal functions + +-spec cluster_changed(#state{}) -> #state{}. +cluster_changed(#state{mod = Mod, ctx = Ctx} = State) -> + State#state{ + last_change = os:timestamp(), + timer = new_timer(interval(State)), + ctx = Mod:cluster_unstable(Ctx) + }. + + +-spec new_timer(non_neg_integer()) -> reference(). +new_timer(IntervalSec) -> + erlang:send_after(IntervalSec * 1000, self(), stability_check). + + +% For the first Period seconds after node boot we check cluster stability every +% StartPeriod seconds. Once the initial Period seconds have passed we continue +% to monitor once every Period seconds +-spec interval(#state{}) -> non_neg_integer(). +interval(#state{period = Period, start_period = StartPeriod, + start_time = T0}) -> + case now_diff_sec(T0) > Period of + true -> + % Normal operation + Period; + false -> + % During startup + StartPeriod + end. + + +-spec now_diff_sec(erlang:timestamp()) -> non_neg_integer(). +now_diff_sec(Time) -> + case timer:now_diff(os:timestamp(), Time) of + USec when USec < 0 -> + 0; + USec when USec >= 0 -> + USec / 1000000 + end. diff --git a/src/mem3/src/mem3_httpd.erl b/src/mem3/src/mem3_httpd.erl index 535815862..571f06370 100644 --- a/src/mem3/src/mem3_httpd.erl +++ b/src/mem3/src/mem3_httpd.erl @@ -32,7 +32,7 @@ handle_membership_req(#httpd{path_parts=[<<"_membership">>]}=Req) -> handle_shards_req(#httpd{method='GET', path_parts=[_DbName, <<"_shards">>]} = Req, Db) -> - DbName = mem3:dbname(Db#db.name), + DbName = mem3:dbname(couch_db:name(Db)), Shards = mem3:shards(DbName), JsonShards = json_shards(Shards, dict:new()), couch_httpd:send_json(Req, {[ @@ -40,7 +40,7 @@ handle_shards_req(#httpd{method='GET', ]}); handle_shards_req(#httpd{method='GET', path_parts=[_DbName, <<"_shards">>, DocId]} = Req, Db) -> - DbName = mem3:dbname(Db#db.name), + DbName = mem3:dbname(couch_db:name(Db)), Shards = mem3:shards(DbName, DocId), {[{Shard, Dbs}]} = json_shards(Shards, dict:new()), couch_httpd:send_json(Req, {[ diff --git a/src/mem3/src/mem3_nodes.erl b/src/mem3/src/mem3_nodes.erl index f31891a7b..555389b90 100644 --- a/src/mem3/src/mem3_nodes.erl +++ b/src/mem3/src/mem3_nodes.erl @@ -92,7 +92,7 @@ code_change(_OldVsn, #state{}=State, _Extra) -> initialize_nodelist() -> DbName = config:get("mem3", "nodes_db", "_nodes"), {ok, Db} = mem3_util:ensure_exists(DbName), - {ok, _, Db} = couch_btree:fold(Db#db.id_tree, fun first_fold/3, Db, []), + {ok, _} = couch_db:fold_docs(Db, fun first_fold/2, Db, []), % add self if not already present case ets:lookup(?MODULE, node()) of [_] -> @@ -103,13 +103,13 @@ initialize_nodelist() -> {ok, _} = couch_db:update_doc(Db, Doc, []) end, couch_db:close(Db), - Db#db.update_seq. + couch_db:get_update_seq(Db). -first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, _, Acc) -> +first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) -> {ok, Acc}; -first_fold(#full_doc_info{deleted=true}, _, Acc) -> +first_fold(#full_doc_info{deleted=true}, Acc) -> {ok, Acc}; -first_fold(#full_doc_info{id=Id}=DocInfo, _, Db) -> +first_fold(#full_doc_info{id=Id}=DocInfo, Db) -> {ok, #doc{body={Props}}} = couch_db:open_doc(Db, DocInfo, [ejson_body]), ets:insert(?MODULE, {mem3_util:to_atom(Id), Props}), {ok, Db}. diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl index 826604ab1..e178fad6d 100644 --- a/src/mem3/src/mem3_rep.erl +++ b/src/mem3/src/mem3_rep.erl @@ -170,16 +170,15 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) -> end. -repl(#db{name=DbName, seq_tree=Bt}=Db, Acc0) -> - erlang:put(io_priority, {internal_repl, DbName}), +repl(Db, Acc0) -> + erlang:put(io_priority, {internal_repl, couch_db:name(Db)}), #acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0#acc{source = Db}), case Seq >= couch_db:get_update_seq(Db) of true -> {ok, 0}; false -> Fun = fun ?MODULE:changes_enumerator/3, - FoldOpts = [{start_key, Seq + 1}], - {ok, _, Acc2} = couch_btree:fold(Bt, Fun, Acc1, FoldOpts), + {ok, _, Acc2} = couch_db:enum_docs_since(Db, Seq, Fun, Acc1, []), {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2), {ok, couch_db:count_changes_since(Db, LastSeq)} end. @@ -343,7 +342,7 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) -> SrcUUID = couch_db:get_uuid(SrcDb), S = couch_util:encodeBase64Url(crypto:hash(md5, term_to_binary(SrcUUID))), DocIdPrefix = <<"_local/shard-sync-", S/binary, "-">>, - FoldFun = fun({DocId, {Rev0, {BodyProps}}}, _, _) -> + FoldFun = fun({DocId, {Rev0, {BodyProps}}}, _) -> TgtUUID = couch_util:get_value(<<"target_uuid">>, BodyProps, <<>>), case is_prefix(DocIdPrefix, DocId) of true -> @@ -360,10 +359,10 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) -> end end, Options = [{start_key, DocIdPrefix}], - case couch_btree:fold(SrcDb#db.local_tree, FoldFun, not_found, Options) of - {ok, _, {TgtUUID, Doc}} -> + case couch_db:fold_local_docs(SrcDb, FoldFun, not_found, Options) of + {ok, {TgtUUID, Doc}} -> {ok, TgtUUID, Doc}; - {ok, _, not_found} -> + {ok, not_found} -> {not_found, missing}; Else -> couch_log:error("Error finding replication doc: ~w", [Else]), diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl index 93cb99ac9..c2bd58fdf 100644 --- a/src/mem3/src/mem3_rpc.erl +++ b/src/mem3/src/mem3_rpc.erl @@ -84,11 +84,11 @@ load_checkpoint_rpc(DbName, SourceNode, SourceUUID) -> save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) -> erlang:put(io_priority, {internal_repl, DbName}), case get_or_create_db(DbName, [?ADMIN_CTX]) of - {ok, #db{update_seq = TargetSeq} = Db} -> + {ok, Db} -> NewEntry = {[ {<<"target_node">>, atom_to_binary(node(), utf8)}, {<<"target_uuid">>, couch_db:get_uuid(Db)}, - {<<"target_seq">>, TargetSeq} + {<<"target_seq">>, couch_db:get_update_seq(Db)} ] ++ NewEntry0}, Body = {[ {<<"seq">>, SourceSeq}, diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl index 8d9cfb9c7..be7e5aaaf 100644 --- a/src/mem3/src/mem3_shards.erl +++ b/src/mem3/src/mem3_shards.erl @@ -323,7 +323,7 @@ get_update_seq() -> DbName = config:get("mem3", "shards_db", "_dbs"), {ok, Db} = mem3_util:ensure_exists(DbName), couch_db:close(Db), - Db#db.update_seq. + couch_db:get_update_seq(Db). listen_for_changes(Since) -> DbName = config:get("mem3", "shards_db", "_dbs"), @@ -380,7 +380,7 @@ load_shards_from_disk(DbName) when is_binary(DbName) -> couch_db:close(Db) end. -load_shards_from_db(#db{} = ShardDb, DbName) -> +load_shards_from_db(ShardDb, DbName) -> case couch_db:open_doc(ShardDb, DbName, [ejson_body]) of {ok, #doc{body = {Props}}} -> Seq = couch_db:get_update_seq(ShardDb), @@ -659,7 +659,7 @@ t_spawn_writer_in_load_shards_from_db() -> meck:expect(couch_db, get_update_seq, 1, 1), meck:expect(mem3_util, build_ordered_shards, 2, mock_shards()), erlang:register(?MODULE, self()), % register to get cache_insert cast - load_shards_from_db(#db{name = <<"testdb">>}, ?DB), + load_shards_from_db(test_util:fake_db([{name, <<"testdb">>}]), ?DB), meck:validate(couch_db), meck:validate(mem3_util), Cast = receive @@ -746,8 +746,8 @@ mem3_shards_changes_test_() -> { setup_changes() -> - ok = meck:expect(mem3_util, ensure_exists, ['_'], - {ok, #db{name = <<"dbs">>, update_seq = 0}}), + RespDb = test_util:fake_db([{name, <<"dbs">>}, {update_seq, 0}]), + ok = meck:expect(mem3_util, ensure_exists, ['_'], {ok, RespDb}), ok = meck:expect(couch_db, close, ['_'], ok), ok = application:start(config), {ok, Pid} = ?MODULE:start_link(), diff --git a/src/mem3/test/mem3_cluster_test.erl b/src/mem3/test/mem3_cluster_test.erl new file mode 100644 index 000000000..4610d64bd --- /dev/null +++ b/src/mem3/test/mem3_cluster_test.erl @@ -0,0 +1,133 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_cluster_test). + +-behavior(mem3_cluster). + +-include_lib("eunit/include/eunit.hrl"). + +-export([ + cluster_unstable/1, + cluster_stable/1 +]). + + +% Mem3 cluster callbacks + +cluster_unstable(Server) -> + Server ! cluster_unstable, + Server. + +cluster_stable(Server) -> + Server ! cluster_stable, + Server. + + +mem3_cluster_test_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + t_cluster_stable_during_startup_period(), + t_cluster_unstable_delivered_on_nodeup(), + t_cluster_unstable_delivered_on_nodedown(), + t_wait_period_is_reset_after_last_change() + ] + }. + + +t_cluster_stable_during_startup_period() -> + ?_test(begin + {ok, Pid} = mem3_cluster:start_link(?MODULE, self(), 1, 2), + register(?MODULE, Pid), + receive + cluster_stable -> + ?assert(true) + after 1500 -> + throw(timeout) + end, + unlink(Pid), + exit(Pid, kill) + end). + + +t_cluster_unstable_delivered_on_nodeup() -> + ?_test(begin + {ok, Pid} = mem3_cluster:start_link(?MODULE, self(), 1, 2), + register(?MODULE, Pid), + Pid ! {nodeup, node()}, + receive + cluster_unstable -> + ?assert(true) + after 1000 -> + throw(timeout) + end, + unlink(Pid), + exit(Pid, kill) + end). + + +t_cluster_unstable_delivered_on_nodedown() -> + ?_test(begin + {ok, Pid} = mem3_cluster:start_link(?MODULE, self(), 1, 2), + register(?MODULE, Pid), + Pid ! {nodedown, node()}, + receive + cluster_unstable -> + ?assert(true) + after 1000 -> + throw(timeout) + end, + unlink(Pid), + exit(Pid, kill) + end). + + +t_wait_period_is_reset_after_last_change() -> + ?_test(begin + {ok, Pid} = mem3_cluster:start_link(?MODULE, self(), 1, 1), + register(?MODULE, Pid), + timer:sleep(800), + Pid ! {nodeup, node()}, % after 800 sec send a nodeup + receive + cluster_stable -> + ?assert(false) + after 400 -> + ?assert(true) % stability check should have been reset + end, + timer:sleep(1000), + receive + cluster_stable -> + ?assert(true) + after 0 -> + ?assert(false) % cluster_stable arrives after enough quiet time + end, + unlink(Pid), + exit(Pid, kill) + end). + + +% Test helper functions + +setup() -> + ok. + +teardown(_) -> + case whereis(?MODULE) of + undefined -> + ok; + Pid when is_pid(Pid) -> + unlink(Pid), + exit(Pid, kill) + end. diff --git a/src/rexi/src/rexi_server_mon.erl b/src/rexi/src/rexi_server_mon.erl index e6b5eb98e..86fecaff6 100644 --- a/src/rexi/src/rexi_server_mon.erl +++ b/src/rexi/src/rexi_server_mon.erl @@ -14,6 +14,7 @@ -module(rexi_server_mon). -behaviour(gen_server). +-behaviour(mem3_cluster). -vsn(1). @@ -32,8 +33,13 @@ code_change/3 ]). +-export([ + cluster_stable/1, + cluster_unstable/1 +]). --define(INTERVAL, 60000). + +-define(CLUSTER_STABILITY_PERIOD_SEC, 15). start_link(ChildMod) -> @@ -45,9 +51,23 @@ status() -> gen_server:call(?MODULE, status). +% Mem3 cluster callbacks + +cluster_unstable(Server) -> + couch_log:notice("~s : cluster unstable", [?MODULE]), + gen_server:cast(Server, cluster_unstable), + Server. + +cluster_stable(Server) -> + gen_server:cast(Server, cluster_stable), + Server. + + +% gen_server callbacks + init(ChildMod) -> - net_kernel:monitor_nodes(true), - erlang:send(self(), check_nodes), + {ok, _Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(), + ?CLUSTER_STABILITY_PERIOD_SEC, ?CLUSTER_STABILITY_PERIOD_SEC), {ok, ChildMod}. @@ -67,24 +87,27 @@ handle_call(Msg, _From, St) -> couch_log:notice("~s ignored_call ~w", [?MODULE, Msg]), {reply, ignored, St}. - -handle_cast(Msg, St) -> - couch_log:notice("~s ignored_cast ~w", [?MODULE, Msg]), - {noreply, St}. - - -handle_info({nodeup, _}, ChildMod) -> +% If cluster is unstable a node was added or just removed. Check if any nodes +% can be started, but do not immediately stop nodes, defer that till cluster +% stabilized. +handle_cast(cluster_unstable, ChildMod) -> + couch_log:notice("~s : cluster unstable", [ChildMod]), start_servers(ChildMod), {noreply, ChildMod}; -handle_info({nodedown, _}, St) -> - {noreply, St}; - -handle_info(check_nodes, ChildMod) -> +% When cluster is stable, start any servers for new nodes and stop servers for +% the ones that disconnected. +handle_cast(cluster_stable, ChildMod) -> + couch_log:notice("~s : cluster stable", [ChildMod]), start_servers(ChildMod), - erlang:send_after(?INTERVAL, self(), check_nodes), + stop_servers(ChildMod), {noreply, ChildMod}; +handle_cast(Msg, St) -> + couch_log:notice("~s ignored_cast ~w", [?MODULE, Msg]), + {noreply, St}. + + handle_info(Msg, St) -> couch_log:notice("~s ignored_info ~w", [?MODULE, Msg]), {noreply, St}. @@ -101,13 +124,27 @@ start_servers(ChildMod) -> {ok, _} = start_server(ChildMod, Id) end, missing_servers(ChildMod)). +stop_servers(ChildMod) -> + lists:foreach(fun(Id) -> + ok = stop_server(ChildMod, Id) + end, extra_servers(ChildMod)). + + +server_ids(ChildMod) -> + Nodes = [node() | nodes()], + [list_to_atom(lists:concat([ChildMod, "_", Node])) || Node <- Nodes]. + + +running_servers(ChildMod) -> + [Id || {Id, _, _, _} <- supervisor:which_children(sup_module(ChildMod))]. + missing_servers(ChildMod) -> - ServerIds = [list_to_atom(lists:concat([ChildMod, "_", Node])) - || Node <- [node() | nodes()]], - SupModule = sup_module(ChildMod), - ChildIds = [Id || {Id, _, _, _} <- supervisor:which_children(SupModule)], - ServerIds -- ChildIds. + server_ids(ChildMod) -- running_servers(ChildMod). + + +extra_servers(ChildMod) -> + running_servers(ChildMod) -- server_ids(ChildMod). start_server(ChildMod, ChildId) -> @@ -126,5 +163,12 @@ start_server(ChildMod, ChildId) -> erlang:error(Else) end. + +stop_server(ChildMod, ChildId) -> + SupMod = sup_module(ChildMod), + ok = supervisor:terminate_child(SupMod, ChildId), + ok = supervisor:delete_child(SupMod, ChildId). + + sup_module(ChildMod) -> list_to_atom(lists:concat([ChildMod, "_sup"])). |