summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJulian <almightyju@arandomworld.co.uk>2017-09-29 17:50:57 +0100
committerGitHub <noreply@github.com>2017-09-29 17:50:57 +0100
commit796e1b039f803c9e7d8785a4b54f2cb3c9be8528 (patch)
treea2136649cd41f8a31139e3e02ed124c090e9c456
parentcacc7747efaf924712a1738c2cf46dbfcdb6bc60 (diff)
parent9751b067748e6fa0f15741613d95eb4737adf75f (diff)
downloadcouchdb-796e1b039f803c9e7d8785a4b54f2cb3c9be8528.tar.gz
Merge branch 'master' into master
-rw-r--r--.travis.yml2
-rw-r--r--Makefile1
-rw-r--r--README-DEV.rst33
-rwxr-xr-xconfigure2
-rw-r--r--rebar.config.script4
-rwxr-xr-xrel/overlay/bin/remsh76
-rw-r--r--rel/overlay/etc/default.ini6
-rw-r--r--src/chttpd/src/chttpd_db.erl21
-rw-r--r--src/chttpd/src/chttpd_external.erl22
-rw-r--r--src/chttpd/src/chttpd_show.erl3
-rw-r--r--src/couch/include/couch_db.hrl34
-rw-r--r--src/couch/src/couch_att.erl3
-rw-r--r--src/couch/src/couch_auth_cache.erl14
-rw-r--r--src/couch/src/couch_changes.erl56
-rw-r--r--src/couch/src/couch_compaction_daemon.erl4
-rw-r--r--src/couch/src/couch_compress.erl14
-rw-r--r--src/couch/src/couch_db.erl307
-rw-r--r--src/couch/src/couch_db_int.hrl93
-rw-r--r--src/couch/src/couch_db_plugin.erl6
-rw-r--r--src/couch/src/couch_db_updater.erl10
-rw-r--r--src/couch/src/couch_httpd_db.erl12
-rw-r--r--src/couch/src/couch_lru.erl9
-rw-r--r--src/couch/src/couch_server.erl135
-rw-r--r--src/couch/src/couch_server_int.hrl23
-rw-r--r--src/couch/src/couch_users_db.erl8
-rw-r--r--src/couch/src/couch_util.erl15
-rw-r--r--src/couch/src/test_util.erl14
-rw-r--r--src/couch/test/couch_auth_cache_tests.erl2
-rw-r--r--src/couch/test/couch_changes_tests.erl2
-rw-r--r--src/couch/test/couch_compress_tests.erl11
-rw-r--r--src/couch/test/couch_db_plugin_tests.erl13
-rw-r--r--src/couch/test/couch_server_tests.erl11
-rw-r--r--src/couch/test/couchdb_compaction_daemon_tests.erl4
-rw-r--r--src/couch/test/couchdb_file_compression_tests.erl2
-rw-r--r--src/couch/test/couchdb_views_tests.erl25
-rw-r--r--src/couch_index/src/couch_index_server.erl15
-rw-r--r--src/couch_index/src/couch_index_util.erl2
-rw-r--r--src/couch_index/test/couch_index_compaction_tests.erl3
-rw-r--r--src/couch_index/test/couch_index_ddoc_updated_tests.erl2
-rw-r--r--src/couch_mrview/src/couch_mrview.erl18
-rw-r--r--src/couch_mrview/src/couch_mrview_compactor.erl3
-rw-r--r--src/couch_mrview/src/couch_mrview_http.erl10
-rw-r--r--src/couch_mrview/src/couch_mrview_show.erl16
-rw-r--r--src/couch_mrview/test/couch_mrview_all_docs_tests.erl2
-rw-r--r--src/couch_mrview/test/couch_mrview_changes_since_tests.erl2
-rw-r--r--src/couch_mrview/test/couch_mrview_collation_tests.erl2
-rw-r--r--src/couch_mrview/test/couch_mrview_compact_tests.erl2
-rw-r--r--src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl2
-rw-r--r--src/couch_mrview/test/couch_mrview_ddoc_validation_tests.erl2
-rw-r--r--src/couch_mrview/test/couch_mrview_index_changes_tests.erl2
-rw-r--r--src/couch_mrview/test/couch_mrview_index_info_tests.erl2
-rw-r--r--src/couch_mrview/test/couch_mrview_local_docs_tests.erl2
-rw-r--r--src/couch_mrview/test/couch_mrview_map_views_tests.erl2
-rw-r--r--src/couch_mrview/test/couch_mrview_red_views_tests.erl2
-rw-r--r--src/couch_replicator/src/couch_replicator_api_wrap.erl27
-rw-r--r--src/couch_replicator/src/couch_replicator_api_wrap.hrl2
-rw-r--r--src/couch_replicator/src/couch_replicator_clustering.erl116
-rw-r--r--src/couch_replicator/src/couch_replicator_docs.erl19
-rw-r--r--src/couch_replicator/src/couch_replicator_ids.erl16
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl200
-rw-r--r--src/couch_replicator/src/couch_replicator_utils.erl37
-rw-r--r--src/couch_replicator/src/couch_replicator_worker.erl56
-rw-r--r--src/couch_replicator/test/couch_replicator_compact_tests.erl27
-rw-r--r--src/couch_stats/src/couch_stats.app.src4
-rw-r--r--src/couch_stats/src/couch_stats.erl6
-rw-r--r--src/couch_stats/src/couch_stats.hrl14
-rw-r--r--src/couch_stats/src/couch_stats_aggregator.erl11
-rw-r--r--src/fabric/include/couch_db_tmp.hrl296
-rw-r--r--src/fabric/rebar.config2
-rw-r--r--src/fabric/src/fabric.erl12
-rw-r--r--src/fabric/src/fabric_db_info.erl48
-rw-r--r--src/fabric/src/fabric_rpc.erl123
-rw-r--r--src/fabric/src/fabric_util.erl3
-rw-r--r--src/mango/src/mango_crud.erl2
-rw-r--r--src/mango/src/mango_cursor.erl8
-rw-r--r--src/mango/src/mango_cursor_text.erl4
-rw-r--r--src/mango/src/mango_httpd.erl3
-rw-r--r--src/mango/src/mango_idx.erl67
-rw-r--r--src/mango/src/mango_idx_text.erl17
-rw-r--r--src/mango/src/mango_idx_view.erl15
-rw-r--r--src/mango/src/mango_native_proc.erl23
-rw-r--r--src/mango/src/mango_selector.erl110
-rw-r--r--src/mango/test/01-index-crud-test.py21
-rw-r--r--src/mango/test/02-basic-find-test.py14
-rw-r--r--src/mango/test/03-operator-test.py6
-rw-r--r--src/mango/test/05-index-selection-test.py80
-rw-r--r--src/mango/test/06-basic-text-test.py3
-rw-r--r--src/mango/test/07-text-custom-field-list-test.py4
-rw-r--r--src/mango/test/08-text-limit-test.py6
-rw-r--r--src/mango/test/09-text-sort-test.py4
-rw-r--r--src/mango/test/14-json-pagination.py4
-rw-r--r--src/mango/test/16-index-selectors.py111
-rw-r--r--src/mango/test/mango.py16
-rw-r--r--src/mango/test/user_docs.py1
-rw-r--r--src/mem3/src/mem3.erl17
-rw-r--r--src/mem3/src/mem3_cluster.erl161
-rw-r--r--src/mem3/src/mem3_httpd.erl4
-rw-r--r--src/mem3/src/mem3_nodes.erl10
-rw-r--r--src/mem3/src/mem3_rep.erl15
-rw-r--r--src/mem3/src/mem3_rpc.erl4
-rw-r--r--src/mem3/src/mem3_shards.erl10
-rw-r--r--src/mem3/test/mem3_cluster_test.erl133
-rw-r--r--src/rexi/src/rexi_server_mon.erl84
103 files changed, 2035 insertions, 1029 deletions
diff --git a/.travis.yml b/.travis.yml
index 56a2b7d71..8aebaabc6 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -58,7 +58,7 @@ before_script:
- cd ../..
script:
- - make check mango-test
+ - make check
after_failure:
- build-aux/logfile-uploader.py
diff --git a/Makefile b/Makefile
index c1dd3a9cf..239a2db1c 100644
--- a/Makefile
+++ b/Makefile
@@ -91,6 +91,7 @@ fauxton: share/www
check: all
@$(MAKE) eunit
@$(MAKE) javascript
+ @$(MAKE) mango-test
# @$(MAKE) build-test
diff --git a/README-DEV.rst b/README-DEV.rst
index 73c684cd2..0f12fa44c 100644
--- a/README-DEV.rst
+++ b/README-DEV.rst
@@ -12,20 +12,29 @@ If you're unsure what this means, ignore this document.
Dependencies
------------
-You may need:
+You need the following to run tests:
+
+* `Python <https://www.python.org/>`_
+* `nose <https://nose.readthedocs.io/en/latest/>`_
+* `requests <http://docs.python-requests.org/>`_
+* `hypothesis <https://pypi.python.org/pypi/hypothesis>`_
+
+You need the following optionally to build documentation:
* `Sphinx <http://sphinx.pocoo.org/>`_
* `GNU help2man <http://www.gnu.org/software/help2man/>`_
* `GnuPG <http://www.gnupg.org/>`_
+
+You need the following optionally to build releases:
+
* `md5sum <http://www.microbrew.org/tools/md5sha1sum/>`_
* `sha1sum <http://www.microbrew.org/tools/md5sha1sum/>`_
+
+You need the following optionally to build Fauxton:
+
* `nodejs <http://nodejs.org/>`_
* `npm <https://www.npmjs.com/>`_
-The first four of these optional dependencies are required for building the
-documentation. The next three are needed to build releases. The last two are for
-needed to build fauxton.
-
You will need these optional dependencies installed if:
* You are working on the documentation, or
@@ -50,14 +59,16 @@ Debian-based (inc. Ubuntu) Systems
::
- sudo apt-get install help2man python-sphinx gnupg nodejs npm
+ sudo apt-get install help2man python-sphinx gnupg nodejs npm \
+ python-hypothesis python-requests python-nose
Gentoo-based Systems
~~~~~~~~~~~~~~~~~~~~
::
- sudo emerge gnupg coreutils pkgconfig help2man sphinx
+ sudo emerge gnupg coreutils pkgconfig help2man sphinx python
+ sudo pip install hypothesis requests nose
RedHat-based (Fedora, Centos, RHEL) Systems
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -65,7 +76,8 @@ RedHat-based (Fedora, Centos, RHEL) Systems
::
sudo yum install help2man python-sphinx python-docutils \
- python-pygments gnupg nodejs npm
+ python-pygments gnupg nodejs npm python-nose python-requests \
+ python-hypothesis
Mac OS X
~~~~~~~~
@@ -85,9 +97,7 @@ If you don't already have pip installed, install it::
Now, install the required Python packages::
- sudo pip install sphinx
- sudo pip install docutils
- sudo pip install pygments
+ sudo pip install sphinx docutils pygments nose requests hypothesis
FreeBSD
~~~~~~~
@@ -95,6 +105,7 @@ FreeBSD
::
pkg install help2man gnupg py27-sphinx node
+ pip install nose requests hypothesis
Windows
~~~~~~~
diff --git a/configure b/configure
index 514551091..412341208 100755
--- a/configure
+++ b/configure
@@ -156,7 +156,7 @@ cat > rel/couchdb.config << EOF
{log_file, "$LOG_FILE"}.
{fauxton_root, "./share/www"}.
{user, "$COUCHDB_USER"}.
-{node_name, "-name couchdb@localhost"}.
+{node_name, "-name couchdb@127.0.0.1"}.
{cluster_port, 5984}.
{backend_port, 5986}.
EOF
diff --git a/rebar.config.script b/rebar.config.script
index 654fb2f12..61c34fd00 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -60,11 +60,11 @@ DepDescs = [
{fauxton, {url, "https://github.com/apache/couchdb-fauxton"},
{tag, "v1.1.13"}, [raw]},
%% Third party deps
-{folsom, "folsom", {tag, "CouchDB-0.8.1"}},
+{folsom, "folsom", {tag, "CouchDB-0.8.2"}},
{ibrowse, "ibrowse", {tag, "CouchDB-4.0.1"}},
{jiffy, "jiffy", {tag, "CouchDB-0.14.11-1"}},
{mochiweb, "mochiweb", {tag, "CouchDB-2.12.0-1"}},
-{meck, "meck", {tag, "0.8.2"}}
+{meck, "meck", {tag, "0.8.8"}}
],
diff --git a/rel/overlay/bin/remsh b/rel/overlay/bin/remsh
new file mode 100755
index 000000000..963c16a10
--- /dev/null
+++ b/rel/overlay/bin/remsh
@@ -0,0 +1,76 @@
+#!/bin/sh
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+COUCHDB_BIN_DIR=$(cd "${0%/*}" && pwd)
+ERTS_BIN_DIR=$COUCHDB_BIN_DIR/../
+ROOTDIR=${ERTS_BIN_DIR%/*}
+START_ERL=$(cat "$ROOTDIR/releases/start_erl.data")
+ERTS_VSN=${START_ERL% *}
+APP_VSN=${START_ERL#* }
+BINDIR=$ROOTDIR/erts-$ERTS_VSN/bin
+
+PROGNAME=${0##*/}
+VERBOSE=""
+NODE="couchdb@127.0.0.1"
+COOKIE=monster
+LHOST=127.0.0.1
+
+printHelpAndExit() {
+ echo "Usage: ${PROGNAME} [OPTION]... [-- <additional Erlang cli options>]"
+ echo " -c cookie specify shared Erlang cookie (default: monster)"
+ echo " -l HOST specify remsh's host name (default: 127.0.0.1)"
+ echo " -m use output of \`hostname -f\` as remsh's host name"
+ echo " -n NAME@HOST specify couchdb's Erlang node name (-name in vm.args)"
+ echo " -v verbose; print invocation line"
+ echo " -h this help message"
+ exit
+}
+
+while getopts ":hn:c:l:mv" optionName; do
+ case "$optionName" in
+ h)
+ printHelpAndExit 0
+ ;;
+ n)
+ NODE=$OPTARG
+ ;;
+ c)
+ COOKIE=$OPTARG
+ ;;
+ l)
+ LHOST=$OPTARG
+ ;;
+ m)
+ LHOST=$(hostname -f)
+ ;;
+ v)
+ VERBOSE=0
+ ;;
+ \?)
+ echo "Invalid option: -$OPTARG" >&2
+ printHelpAndExit 0
+ ;;
+ esac
+done
+
+shift $((OPTIND - 1))
+
+if [ ! -z "$VERBOSE" ]; then
+ # cheap but it works
+ set -x
+fi
+
+exec "$BINDIR/erl" -boot "$ROOTDIR/releases/$APP_VSN/start_clean" \
+ -name remsh$$@$LHOST -remsh $NODE -hidden -setcookie $COOKIE \
+ "$@"
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 27a952c74..122853542 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -370,7 +370,7 @@ connection_timeout = 30000
; Request timeout
;request_timeout = infinity
; If a request fails, the replicator will retry it up to N times.
-retries_per_request = 10
+retries_per_request = 5
; Use checkpoints
;use_checkpoints = true
; Checkpoint interval
@@ -542,3 +542,7 @@ writer = stderr
; syslog_port = 514
; syslog_appid = couchdb
; syslog_facility = local2
+
+[stats]
+; Stats collection interval in seconds. Default 10 seconds.
+;interval = 10
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index a7796fcdf..c8826d581 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -84,7 +84,7 @@ handle_changes_req1(#httpd{}=Req, Db) ->
#changes_args{filter=Raw, style=Style} = Args0 = parse_changes_query(Req),
ChangesArgs = Args0#changes_args{
filter_fun = couch_changes:configure_filter(Raw, Style, Req, Db),
- db_open_options = [{user_ctx, Db#db.user_ctx}]
+ db_open_options = [{user_ctx, couch_db:get_user_ctx(Db)}]
},
Max = chttpd:chunked_response_buffer_size(),
case ChangesArgs#changes_args.feed of
@@ -253,7 +253,7 @@ handle_view_cleanup_req(Req, Db) ->
handle_design_req(#httpd{
path_parts=[_DbName, _Design, Name, <<"_",_/binary>> = Action | _Rest]
}=Req, Db) ->
- DbName = mem3:dbname(Db#db.name),
+ DbName = mem3:dbname(couch_db:name(Db)),
case ddoc_cache:open(DbName, <<"_design/", Name/binary>>) of
{ok, DDoc} ->
Handler = chttpd_handlers:design_handler(Action, fun bad_action_req/3),
@@ -309,7 +309,8 @@ delete_db_req(#httpd{}=Req, DbName) ->
do_db_req(#httpd{path_parts=[DbName|_], user_ctx=Ctx}=Req, Fun) ->
fabric:get_security(DbName, [{user_ctx,Ctx}]), % calls check_is_reader
- Fun(Req, #db{name=DbName, user_ctx=Ctx}).
+ {ok, Db} = couch_db:clustered_db(DbName, Ctx),
+ Fun(Req, Db).
db_req(#httpd{method='GET',path_parts=[DbName]}=Req, _Db) ->
% measure the time required to generate the etag, see if it's worth it
@@ -767,16 +768,17 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) ->
} = parse_doc_query(Req),
couch_doc:validate_docid(DocId),
+ DbName = couch_db:name(Db),
W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
Options = [{user_ctx,Ctx}, {w,W}],
- Loc = absolute_uri(Req, [$/, couch_util:url_encode(Db#db.name),
+ Loc = absolute_uri(Req, [$/, couch_util:url_encode(DbName),
$/, couch_util:url_encode(DocId)]),
RespHeaders = [{"Location", Loc}],
case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of
("multipart/related;" ++ _) = ContentType ->
couch_httpd:check_max_request_length(Req),
- couch_httpd_multipart:num_mp_writers(mem3:n(mem3:dbname(Db#db.name), DocId)),
+ couch_httpd_multipart:num_mp_writers(mem3:n(mem3:dbname(DbName), DocId)),
{ok, Doc0, WaitFun, Parser} = couch_doc:doc_from_multi_part_stream(ContentType,
fun() -> receive_request_data(Req) end),
Doc = couch_doc_from_req(Req, DocId, Doc0),
@@ -833,8 +835,9 @@ db_doc_req(#httpd{method='COPY', user_ctx=Ctx}=Req, Db, SourceDocId) ->
HttpCode = 202
end,
% respond
+ DbName = couch_db:name(Db),
{PartRes} = update_doc_result_to_json(TargetDocId, {ok, NewTargetRev}),
- Loc = absolute_uri(Req, "/" ++ couch_util:url_encode(Db#db.name) ++ "/" ++ couch_util:url_encode(TargetDocId)),
+ Loc = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName) ++ "/" ++ couch_util:url_encode(TargetDocId)),
send_json(Req, HttpCode,
[{"Location", Loc},
{"ETag", "\"" ++ ?b2l(couch_doc:rev_to_str(NewTargetRev)) ++ "\""}],
@@ -1057,8 +1060,8 @@ couch_doc_from_req(Req, DocId, Json) ->
% couch_doc_open(Db, DocId) ->
% couch_doc_open(Db, DocId, nil, []).
-couch_doc_open(#db{} = Db, DocId, Rev, Options0) ->
- Options = [{user_ctx, Db#db.user_ctx} | Options0],
+couch_doc_open(Db, DocId, Rev, Options0) ->
+ Options = [{user_ctx, couch_db:get_user_ctx(Db)} | Options0],
case Rev of
nil -> % open most recent rev
case fabric:open_doc(Db, DocId, Options) of
@@ -1262,7 +1265,7 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa
HttpCode = 202
end,
erlang:put(mochiweb_request_recv, true),
- #db{name=DbName} = Db,
+ DbName = couch_db:name(Db),
{Status, Headers} = case Method of
'DELETE' ->
diff --git a/src/chttpd/src/chttpd_external.erl b/src/chttpd/src/chttpd_external.erl
index 4abeecb37..64664b98e 100644
--- a/src/chttpd/src/chttpd_external.erl
+++ b/src/chttpd/src/chttpd_external.erl
@@ -120,16 +120,22 @@ json_req_obj_field(<<"secObj">>, #httpd{user_ctx=UserCtx}, Db, _DocId) ->
get_db_security(Db, UserCtx).
-get_db_info(#db{main_pid = nil} = Db) ->
- fabric:get_db_info(Db);
-get_db_info(#db{} = Db) ->
- couch_db:get_db_info(Db).
+get_db_info(Db) ->
+ case couch_db:is_clustered(Db) of
+ true ->
+ fabric:get_db_info(Db);
+ false ->
+ couch_db:get_db_info(Db)
+ end.
-get_db_security(#db{main_pid = nil}=Db, #user_ctx{}) ->
- fabric:get_security(Db);
-get_db_security(#db{}=Db, #user_ctx{}) ->
- couch_db:get_security(Db).
+get_db_security(Db, #user_ctx{}) ->
+ case couch_db:is_clustered(Db) of
+ true ->
+ fabric:get_security(Db);
+ false ->
+ couch_db:get_security(Db)
+ end.
to_json_terms(Data) ->
diff --git a/src/chttpd/src/chttpd_show.erl b/src/chttpd/src/chttpd_show.erl
index 48f14257e..c6d232c96 100644
--- a/src/chttpd/src/chttpd_show.erl
+++ b/src/chttpd/src/chttpd_show.erl
@@ -199,7 +199,8 @@ handle_view_list_req(Req, _Db, _DDoc) ->
handle_view_list(Req, Db, DDoc, LName, {ViewDesignName, ViewName}, Keys) ->
%% Will throw an exception if the _list handler is missing
couch_util:get_nested_json_value(DDoc#doc.body, [<<"lists">>, LName]),
- {ok, VDoc} = ddoc_cache:open(Db#db.name, <<"_design/", ViewDesignName/binary>>),
+ DbName = couch_db:name(Db),
+ {ok, VDoc} = ddoc_cache:open(DbName, <<"_design/", ViewDesignName/binary>>),
CB = fun couch_mrview_show:list_cb/2,
QueryArgs = couch_mrview_http:parse_params(Req, Keys),
Options = [{user_ctx, Req#httpd.user_ctx}],
diff --git a/src/couch/include/couch_db.hrl b/src/couch/include/couch_db.hrl
index 7049c6e5f..37f5792c3 100644
--- a/src/couch/include/couch_db.hrl
+++ b/src/couch/include/couch_db.hrl
@@ -30,12 +30,7 @@
-define(i2b(V), couch_util:integer_to_boolean(V)).
-define(b2i(V), couch_util:boolean_to_integer(V)).
-define(term_to_bin(T), term_to_binary(T, [{minor_version, 1}])).
--define(term_size(T),
- try
- erlang:external_size(T)
- catch _:_ ->
- byte_size(?term_to_bin(T))
- end).
+-define(term_size(T), erlang:external_size(T, [{minor_version, 1}])).
-define(DEFAULT_ATTACHMENT_CONTENT_TYPE, <<"application/octet-stream">>).
@@ -129,33 +124,6 @@
handler
}).
--record(db, {
- main_pid = nil,
- compactor_pid = nil,
- instance_start_time, % number of microsecs since jan 1 1970 as a binary string
- fd,
- fd_monitor,
- header = couch_db_header:new(),
- committed_update_seq,
- id_tree,
- seq_tree,
- local_tree,
- update_seq,
- name,
- filepath,
- validate_doc_funs = undefined,
- security = [],
- security_ptr = nil,
- user_ctx = #user_ctx{},
- waiting_delayed_commit = nil,
- revs_limit = 1000,
- fsync_options = [],
- options = [],
- compression,
- before_doc_update = nil, % nil | fun(Doc, Db) -> NewDoc
- after_doc_read = nil % nil | fun(Doc, Db) -> NewDoc
-}).
-
-record(view_fold_helper_funs, {
reduce_count,
passed_end,
diff --git a/src/couch/src/couch_att.erl b/src/couch/src/couch_att.erl
index 5c040a8c4..3380f5739 100644
--- a/src/couch/src/couch_att.erl
+++ b/src/couch/src/couch_att.erl
@@ -494,6 +494,9 @@ flush(Fd, Att) ->
flush_data(Fd, fetch(data, Att), Att).
+flush_data(Fd, {stream, {couch_bt_engine_stream, {OtherFd, StreamPointer}}},
+ Att) ->
+ flush_data(Fd, {OtherFd, StreamPointer}, Att);
flush_data(Fd, {Fd0, _}, Att) when Fd0 == Fd ->
% already written to our file, nothing to write
Att;
diff --git a/src/couch/src/couch_auth_cache.erl b/src/couch/src/couch_auth_cache.erl
index 1c4b86651..16c59d19a 100644
--- a/src/couch/src/couch_auth_cache.erl
+++ b/src/couch/src/couch_auth_cache.erl
@@ -322,13 +322,15 @@ refresh_entries(AuthDb) ->
nil ->
ok;
AuthDb2 ->
- case AuthDb2#db.update_seq > AuthDb#db.update_seq of
+ AuthDbSeq = couch_db:get_update_seq(AuthDb),
+ AuthDb2Seq = couch_db:get_update_seq(AuthDb2),
+ case AuthDb2Seq > AuthDbSeq of
true ->
{ok, _, _} = couch_db:enum_docs_since(
AuthDb2,
- AuthDb#db.update_seq,
+ AuthDbSeq,
fun(DocInfo, _, _) -> refresh_entry(AuthDb2, DocInfo) end,
- AuthDb#db.update_seq,
+ AuthDbSeq,
[]
),
true = ets:insert(?STATE, {auth_db, AuthDb2});
@@ -386,7 +388,9 @@ cache_needs_refresh() ->
nil ->
false;
AuthDb2 ->
- AuthDb2#db.update_seq > AuthDb#db.update_seq
+ AuthDbSeq = couch_db:get_update_seq(AuthDb),
+ AuthDb2Seq = couch_db:get_update_seq(AuthDb2),
+ AuthDb2Seq > AuthDbSeq
end
end,
false
@@ -407,7 +411,7 @@ exec_if_auth_db(Fun) ->
exec_if_auth_db(Fun, DefRes) ->
case ets:lookup(?STATE, auth_db) of
- [{auth_db, #db{} = AuthDb}] ->
+ [{auth_db, AuthDb}] ->
Fun(AuthDb);
_ ->
DefRes
diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl
index 26d272a88..3e4175014 100644
--- a/src/couch/src/couch_changes.erl
+++ b/src/couch/src/couch_changes.erl
@@ -79,9 +79,10 @@ handle_changes(Args1, Req, Db0, Type) ->
_ ->
{false, undefined, undefined}
end,
+ DbName = couch_db:name(Db0),
{StartListenerFun, View} = if UseViewChanges ->
{ok, {_, View0, _}, _, _} = couch_mrview_util:get_view(
- Db0#db.name, DDocName, ViewName, #mrargs{}),
+ DbName, DDocName, ViewName, #mrargs{}),
case View0#mrview.seq_btree of
#btree{} ->
ok;
@@ -90,14 +91,14 @@ handle_changes(Args1, Req, Db0, Type) ->
end,
SNFun = fun() ->
couch_event:link_listener(
- ?MODULE, handle_view_event, {self(), DDocName}, [{dbname, Db0#db.name}]
+ ?MODULE, handle_view_event, {self(), DDocName}, [{dbname, DbName}]
)
end,
{SNFun, View0};
true ->
SNFun = fun() ->
couch_event:link_listener(
- ?MODULE, handle_db_event, self(), [{dbname, Db0#db.name}]
+ ?MODULE, handle_db_event, self(), [{dbname, DbName}]
)
end,
{SNFun, undefined}
@@ -112,7 +113,7 @@ handle_changes(Args1, Req, Db0, Type) ->
end,
View2 = if UseViewChanges ->
{ok, {_, View1, _}, _, _} = couch_mrview_util:get_view(
- Db0#db.name, DDocName, ViewName, #mrargs{}),
+ DbName, DDocName, ViewName, #mrargs{}),
View1;
true ->
undefined
@@ -220,11 +221,11 @@ configure_filter("_view", Style, Req, Db) ->
catch _:_ ->
view
end,
- case Db#db.id_tree of
- undefined ->
+ case couch_db:is_clustered(Db) of
+ true ->
DIR = fabric_util:doc_id_and_rev(DDoc),
{fetch, FilterType, Style, DIR, VName};
- _ ->
+ false ->
{FilterType, Style, DDoc, VName}
end;
[] ->
@@ -243,11 +244,11 @@ configure_filter(FilterName, Style, Req, Db) ->
[DName, FName] ->
{ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
check_member_exists(DDoc, [<<"filters">>, FName]),
- case Db#db.id_tree of
- undefined ->
+ case couch_db:is_clustered(Db) of
+ true ->
DIR = fabric_util:doc_id_and_rev(DDoc),
{fetch, custom, Style, Req, DIR, FName};
- _ ->
+ false->
{custom, Style, Req, DDoc, FName}
end;
@@ -396,15 +397,19 @@ check_fields(_Fields) ->
throw({bad_request, "Selector error: fields must be JSON array"}).
-open_ddoc(#db{name=DbName, id_tree=undefined}, DDocId) ->
- case ddoc_cache:open_doc(mem3:dbname(DbName), DDocId) of
- {ok, _} = Resp -> Resp;
- Else -> throw(Else)
- end;
open_ddoc(Db, DDocId) ->
- case couch_db:open_doc(Db, DDocId, [ejson_body]) of
- {ok, _} = Resp -> Resp;
- Else -> throw(Else)
+ DbName = couch_db:name(Db),
+ case couch_db:is_clustered(Db) of
+ true ->
+ case ddoc_cache:open_doc(mem3:dbname(DbName), DDocId) of
+ {ok, _} = Resp -> Resp;
+ Else -> throw(Else)
+ end;
+ false ->
+ case couch_db:open_doc(Db, DDocId, [ejson_body]) of
+ {ok, _} = Resp -> Resp;
+ Else -> throw(Else)
+ end
end.
@@ -572,7 +577,7 @@ can_optimize(_, _) ->
send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) ->
- Lookups = couch_btree:lookup(Db#db.id_tree, DocIds),
+ Lookups = couch_db:get_full_doc_infos(Db, DocIds),
FullInfos = lists:foldl(fun
({ok, FDI}, Acc) -> [FDI | Acc];
(not_found, Acc) -> Acc
@@ -581,11 +586,9 @@ send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) ->
send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) ->
- FoldFun = fun(FullDocInfo, _, Acc) ->
- {ok, [FullDocInfo | Acc]}
- end,
+ FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end,
KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
- {ok, _, FullInfos} = couch_btree:fold(Db#db.id_tree, FoldFun, [], KeyOpts),
+ {ok, FullInfos} = couch_db:fold_docs(Db, FoldFun, [], KeyOpts),
send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
@@ -646,8 +649,8 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
true ->
case wait_updated(Timeout, TimeoutFun, UserAcc2) of
{updated, UserAcc4} ->
- DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions],
- case couch_db:open(Db#db.name, DbOptions1) of
+ DbOptions1 = [{user_ctx, couch_db:get_user_ctx(Db)} | DbOptions],
+ case couch_db:open(couch_db:name(Db), DbOptions1) of
{ok, Db2} ->
?MODULE:keep_sending_changes(
Args#changes_args{limit=NewLimit},
@@ -671,7 +674,8 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
maybe_refresh_view(_, undefined, undefined) ->
undefined;
maybe_refresh_view(Db, DDocName, ViewName) ->
- {ok, {_, View, _}, _, _} = couch_mrview_util:get_view(Db#db.name, DDocName, ViewName, #mrargs{}),
+ DbName = couch_db:name(Db),
+ {ok, {_, View, _}, _, _} = couch_mrview_util:get_view(DbName, DDocName, ViewName, #mrargs{}),
View.
end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
diff --git a/src/couch/src/couch_compaction_daemon.erl b/src/couch/src/couch_compaction_daemon.erl
index 024b867d0..38e185da8 100644
--- a/src/couch/src/couch_compaction_daemon.erl
+++ b/src/couch/src/couch_compaction_daemon.erl
@@ -319,7 +319,7 @@ can_db_compact(#config{db_frag = Threshold} = Config, Db) ->
{Frag, SpaceRequired} = frag(DbInfo),
couch_log:debug("Fragmentation for database `~s` is ~p%, estimated"
" space for compaction is ~p bytes.",
- [Db#db.name, Frag, SpaceRequired]),
+ [couch_db:name(Db), Frag, SpaceRequired]),
case check_frag(Threshold, Frag) of
false ->
false;
@@ -332,7 +332,7 @@ can_db_compact(#config{db_frag = Threshold} = Config, Db) ->
couch_log:warning("Compaction daemon - skipping database `~s` "
"compaction: the estimated necessary disk space is about ~p"
" bytes but the currently available disk space is ~p bytes.",
- [Db#db.name, SpaceRequired, Free]),
+ [couch_db:name(Db), SpaceRequired, Free]),
false
end
end
diff --git a/src/couch/src/couch_compress.erl b/src/couch/src/couch_compress.erl
index 71588b228..cfcc2a481 100644
--- a/src/couch/src/couch_compress.erl
+++ b/src/couch/src/couch_compress.erl
@@ -14,6 +14,7 @@
-export([compress/2, decompress/1, is_compressed/2]).
-export([get_compression_method/0]).
+-export([uncompressed_size/1]).
-include_lib("couch/include/couch_db.hrl").
@@ -83,3 +84,16 @@ is_compressed(Term, _Method) when not is_binary(Term) ->
is_compressed(_, _) ->
error(invalid_compression).
+
+uncompressed_size(<<?SNAPPY_PREFIX, Rest/binary>>) ->
+ {ok, Size} = snappy:uncompressed_length(Rest),
+ Size;
+uncompressed_size(<<?COMPRESSED_TERM_PREFIX, Size:32, _/binary>> = _Bin) ->
+ % See http://erlang.org/doc/apps/erts/erl_ext_dist.html
+ % The uncompressed binary would be encoded with <<131, Rest/binary>>
+ % so need to add 1 for 131
+ Size + 1;
+uncompressed_size(<<?TERM_PREFIX, _/binary>> = Bin) ->
+ byte_size(Bin);
+uncompressed_size(_) ->
+ error(invalid_compression).
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 7a1afa750..5e720c284 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -12,32 +12,118 @@
-module(couch_db).
--export([open/2,open_int/2,close/1,create/2,get_db_info/1,get_design_docs/1]).
--export([start_compact/1, cancel_compact/1]).
--export([wait_for_compaction/1, wait_for_compaction/2]).
--export([is_idle/1,monitor/1,count_changes_since/2]).
--export([update_doc/3,update_doc/4,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
--export([get_doc_info/2,get_full_doc_info/2,get_full_doc_infos/2]).
--export([open_doc/2,open_doc/3,open_doc_revs/4]).
--export([set_revs_limit/2,get_revs_limit/1]).
--export([get_missing_revs/2,name/1,get_update_seq/1,get_committed_update_seq/1]).
--export([get_uuid/1, get_epochs/1, get_compacted_seq/1]).
--export([enum_docs/4,enum_docs_since/5]).
--export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
--export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
--export([start_link/3,open_doc_int/3,ensure_full_commit/1,ensure_full_commit/2]).
--export([set_security/2,get_security/1]).
--export([changes_since/4,changes_since/5,read_doc/2,new_revid/1]).
--export([check_is_admin/1, is_admin/1, check_is_member/1, get_doc_count/1]).
--export([reopen/1, is_system_db/1, compression/1, make_doc/5]).
--export([load_validation_funs/1]).
--export([check_md5/2, with_stream/3]).
--export([monitored_by/1]).
--export([normalize_dbname/1]).
--export([validate_dbname/1]).
--export([dbname_suffix/1]).
+-export([
+ create/2,
+ open/2,
+ open_int/2,
+ incref/1,
+ reopen/1,
+ close/1,
+
+ clustered_db/2,
+ clustered_db/3,
+
+ monitor/1,
+ monitored_by/1,
+ is_idle/1,
+
+ is_admin/1,
+ check_is_admin/1,
+ check_is_member/1,
+
+ name/1,
+ compression/1,
+ get_after_doc_read_fun/1,
+ get_before_doc_update_fun/1,
+ get_committed_update_seq/1,
+ get_compacted_seq/1,
+ get_compactor_pid/1,
+ get_db_info/1,
+ get_doc_count/1,
+ get_epochs/1,
+ get_filepath/1,
+ get_instance_start_time/1,
+ get_last_purged/1,
+ get_pid/1,
+ get_revs_limit/1,
+ get_security/1,
+ get_update_seq/1,
+ get_user_ctx/1,
+ get_uuid/1,
+ get_purge_seq/1,
+
+ is_db/1,
+ is_system_db/1,
+ is_clustered/1,
+
+ increment_update_seq/1,
+ set_revs_limit/2,
+ set_security/2,
+ set_user_ctx/2,
+
+ ensure_full_commit/1,
+ ensure_full_commit/2,
+
+ load_validation_funs/1,
+
+ open_doc/2,
+ open_doc/3,
+ open_doc_revs/4,
+ open_doc_int/3,
+ read_doc/2,
+ get_doc_info/2,
+ get_full_doc_info/2,
+ get_full_doc_infos/2,
+ get_missing_revs/2,
+ get_design_docs/1,
+
+ update_doc/3,
+ update_doc/4,
+ update_docs/4,
+ update_docs/2,
+ update_docs/3,
+ delete_doc/3,
+
+ purge_docs/2,
+
+ with_stream/3,
+
+ fold_docs/4,
+ fold_local_docs/4,
+ enum_docs/4,
+ enum_docs_reduce_to_count/1,
+
+ enum_docs_since/5,
+ enum_docs_since_reduce_to_count/1,
+ changes_since/4,
+ changes_since/5,
+ count_changes_since/2,
+
+ calculate_start_seq/3,
+ owner_of/2,
+
+ start_compact/1,
+ cancel_compact/1,
+ wait_for_compaction/1,
+ wait_for_compaction/2,
+
+ dbname_suffix/1,
+ normalize_dbname/1,
+ validate_dbname/1,
+
+ check_md5/2,
+ make_doc/5,
+ new_revid/1
+]).
+
+
+-export([
+ start_link/3
+]).
+
-include_lib("couch/include/couch_db.hrl").
+-include("couch_db_int.hrl").
-define(DBNAME_REGEX,
"^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*" % use the stock CouchDB regex
@@ -112,9 +198,31 @@ reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) ->
{ok, NewDb#db{user_ctx = UserCtx, fd_monitor = NewRef}}
end.
+incref(#db{fd = Fd} = Db) ->
+ Ref = erlang:monitor(process, Fd),
+ {ok, Db#db{fd_monitor = Ref}}.
+
+clustered_db(DbName, UserCtx) ->
+ clustered_db(DbName, UserCtx, []).
+
+clustered_db(DbName, UserCtx, SecProps) ->
+ {ok, #db{name = DbName, user_ctx = UserCtx, security = SecProps}}.
+
+is_db(#db{}) ->
+ true;
+is_db(_) ->
+ false.
+
is_system_db(#db{options = Options}) ->
lists:member(sys_db, Options).
+is_clustered(#db{main_pid = nil}) ->
+ true;
+is_clustered(#db{}) ->
+ false;
+is_clustered(?NEW_PSE_DB = Db) ->
+ ?PSE_DB_MAIN_PID(Db) == undefined.
+
ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) ->
ok = gen_server:call(Pid, full_commit, infinity),
{ok, StartTime}.
@@ -126,6 +234,8 @@ ensure_full_commit(Db, RequiredSeq) ->
close(#db{fd_monitor=Ref}) ->
erlang:demonitor(Ref, [flush]),
+ ok;
+close(?NEW_PSE_DB) ->
ok.
is_idle(#db{compactor_pid=nil, waiting_delayed_commit=nil} = Db) ->
@@ -295,12 +405,23 @@ increment_update_seq(#db{main_pid=Pid}) ->
purge_docs(#db{main_pid=Pid}, IdsRevs) ->
gen_server:call(Pid, {purge_docs, IdsRevs}).
+get_after_doc_read_fun(#db{after_doc_read = Fun}) ->
+ Fun.
+
+get_before_doc_update_fun(#db{before_doc_update = Fun}) ->
+ Fun.
+
get_committed_update_seq(#db{committed_update_seq=Seq}) ->
Seq.
get_update_seq(#db{update_seq=Seq})->
Seq.
+get_user_ctx(#db{user_ctx = UserCtx}) ->
+ UserCtx;
+get_user_ctx(?NEW_PSE_DB = Db) ->
+ ?PSE_DB_USER_CTX(Db).
+
get_purge_seq(#db{}=Db) ->
couch_db_header:purge_seq(Db#db.header).
@@ -312,19 +433,33 @@ get_last_purged(#db{}=Db) ->
couch_file:pread_term(Db#db.fd, Pointer)
end.
+get_pid(#db{main_pid = Pid}) ->
+ Pid.
+
get_doc_count(Db) ->
- {ok, {Count, _, _}} = couch_btree:full_reduce(Db#db.id_tree),
- {ok, Count}.
+ {ok, Reds} = couch_btree:full_reduce(Db#db.id_tree),
+ {ok, element(1, Reds)}.
get_uuid(#db{}=Db) ->
couch_db_header:uuid(Db#db.header).
get_epochs(#db{}=Db) ->
- couch_db_header:epochs(Db#db.header).
+ Epochs = couch_db_header:epochs(Db#db.header),
+ validate_epochs(Epochs),
+ Epochs.
+
+get_filepath(#db{filepath = FilePath}) ->
+ FilePath.
+
+get_instance_start_time(#db{instance_start_time = IST}) ->
+ IST.
get_compacted_seq(#db{}=Db) ->
couch_db_header:compacted_seq(Db#db.header).
+get_compactor_pid(#db{compactor_pid = Pid}) ->
+ Pid.
+
get_db_info(Db) ->
#db{fd=Fd,
header=Header,
@@ -503,7 +638,9 @@ get_members(#db{security=SecProps}) ->
couch_util:get_value(<<"readers">>, SecProps, {[]})).
get_security(#db{security=SecProps}) ->
- {SecProps}.
+ {SecProps};
+get_security(?NEW_PSE_DB = Db) ->
+ {?PSE_DB_SECURITY(Db)}.
set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) ->
check_is_admin(Db),
@@ -514,6 +651,9 @@ set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) ->
set_security(_, _) ->
throw(bad_request).
+set_user_ctx(#db{} = Db, UserCtx) ->
+ {ok, Db#db{user_ctx = UserCtx}}.
+
validate_security_object(SecProps) ->
Admins = couch_util:get_value(<<"admins">>, SecProps, {[]}),
% we fallback to readers here for backwards compatibility
@@ -549,7 +689,9 @@ set_revs_limit(_Db, _Limit) ->
throw(invalid_revs_limit).
name(#db{name=Name}) ->
- Name.
+ Name;
+name(?NEW_PSE_DB = Db) ->
+ ?PSE_DB_NAME(Db).
compression(#db{compression=Compression}) ->
Compression.
@@ -1275,6 +1417,17 @@ enum_docs_since(Db, SinceSeq, InFun, Acc, Options) ->
[{start_key, SinceSeq + 1} | Options]),
{ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}.
+
+fold_docs(Db, InFun, InAcc, Opts) ->
+ Wrapper = fun(FDI, _, Acc) -> InFun(FDI, Acc) end,
+ {ok, _, AccOut} = couch_btree:fold(Db#db.id_tree, Wrapper, InAcc, Opts),
+ {ok, AccOut}.
+
+fold_local_docs(Db, InFun, InAcc, Opts) ->
+ Wrapper = fun(FDI, _, Acc) -> InFun(FDI, Acc) end,
+ {ok, _, AccOut} = couch_btree:fold(Db#db.local_tree, Wrapper, InAcc, Opts),
+ {ok, AccOut}.
+
enum_docs(Db, InFun, InAcc, Options0) ->
{NS, Options} = extract_namespace(Options0),
enum_docs(Db, NS, InFun, InAcc, Options).
@@ -1298,6 +1451,78 @@ enum_docs(Db, NS, InFun, InAcc, Options0) ->
Db#db.id_tree, FoldFun, InAcc, Options),
{ok, enum_docs_reduce_to_count(LastReduce), OutAcc}.
+
+calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) ->
+ Seq;
+calculate_start_seq(Db, Node, {Seq, Uuid}) ->
+ % Treat the current node as the epoch node
+ calculate_start_seq(Db, Node, {Seq, Uuid, Node});
+calculate_start_seq(Db, _Node, {Seq, Uuid, EpochNode}) ->
+ case is_prefix(Uuid, get_uuid(Db)) of
+ true ->
+ case is_owner(EpochNode, Seq, get_epochs(Db)) of
+ true -> Seq;
+ false -> 0
+ end;
+ false ->
+ %% The file was rebuilt, most likely in a different
+ %% order, so rewind.
+ 0
+ end;
+calculate_start_seq(Db, _Node, {replace, OriginalNode, Uuid, Seq}) ->
+ case is_prefix(Uuid, couch_db:get_uuid(Db)) of
+ true ->
+ start_seq(get_epochs(Db), OriginalNode, Seq);
+ false ->
+ {replace, OriginalNode, Uuid, Seq}
+ end.
+
+
+validate_epochs(Epochs) ->
+ %% Assert uniqueness.
+ case length(Epochs) == length(lists:ukeysort(2, Epochs)) of
+ true -> ok;
+ false -> erlang:error(duplicate_epoch)
+ end,
+ %% Assert order.
+ case Epochs == lists:sort(fun({_, A}, {_, B}) -> B =< A end, Epochs) of
+ true -> ok;
+ false -> erlang:error(epoch_order)
+ end.
+
+
+is_prefix(Pattern, Subject) ->
+ binary:longest_common_prefix([Pattern, Subject]) == size(Pattern).
+
+
+is_owner(Node, Seq, Epochs) ->
+ Node =:= owner_of(Epochs, Seq).
+
+
+owner_of(Db, Seq) when not is_list(Db) ->
+ owner_of(get_epochs(Db), Seq);
+owner_of([], _Seq) ->
+ undefined;
+owner_of([{EpochNode, EpochSeq} | _Rest], Seq) when Seq > EpochSeq ->
+ EpochNode;
+owner_of([_ | Rest], Seq) ->
+ owner_of(Rest, Seq).
+
+
+start_seq([{OrigNode, EpochSeq} | _], OrigNode, Seq) when Seq > EpochSeq ->
+ %% OrigNode is the owner of the Seq so we can safely stream from there
+ Seq;
+start_seq([{_, NewSeq}, {OrigNode, _} | _], OrigNode, Seq) when Seq > NewSeq ->
+ %% We transferred this file before Seq was written on OrigNode, so we need
+ %% to stream from the beginning of the next epoch. Note that it is _not_
+ %% necessary for the current node to own the epoch beginning at NewSeq
+ NewSeq;
+start_seq([_ | Rest], OrigNode, Seq) ->
+ start_seq(Rest, OrigNode, Seq);
+start_seq([], OrigNode, Seq) ->
+ erlang:error({epoch_mismatch, OrigNode, Seq}).
+
+
extract_namespace(Options0) ->
case proplists:split(Options0, [namespace]) of
{[[{namespace, NS}]], Options} ->
@@ -1636,6 +1861,30 @@ should_fail_validate_dbname(DbName) ->
ok
end)}.
+calculate_start_seq_test() ->
+ %% uuid mismatch is always a rewind.
+ Hdr1 = couch_db_header:new(),
+ Hdr2 = couch_db_header:set(Hdr1, [{epochs, [{node1, 1}]}, {uuid, <<"uuid1">>}]),
+ ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, node1, {1, <<"uuid2">>})),
+ %% uuid matches and seq is owned by node.
+ Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]),
+ ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, node1, {2, <<"uuid1">>})),
+ %% uuids match but seq is not owned by node.
+ Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]),
+ ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, node1, {3, <<"uuid1">>})),
+ %% return integer if we didn't get a vector.
+ ?assertEqual(4, calculate_start_seq(#db{}, foo, 4)).
+
+is_owner_test() ->
+ ?assertNot(is_owner(foo, 1, [])),
+ ?assertNot(is_owner(foo, 1, [{foo, 1}])),
+ ?assert(is_owner(foo, 2, [{foo, 1}])),
+ ?assert(is_owner(foo, 50, [{bar, 100}, {foo, 1}])),
+ ?assert(is_owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])),
+ ?assert(is_owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])),
+ ?assertError(duplicate_epoch, validate_epochs([{foo, 1}, {bar, 1}])),
+ ?assertError(epoch_order, validate_epochs([{foo, 100}, {bar, 200}])).
+
to_binary(DbName) when is_list(DbName) ->
?l2b(DbName);
to_binary(DbName) when is_binary(DbName) ->
diff --git a/src/couch/src/couch_db_int.hrl b/src/couch/src/couch_db_int.hrl
new file mode 100644
index 000000000..da1e45d75
--- /dev/null
+++ b/src/couch/src/couch_db_int.hrl
@@ -0,0 +1,93 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-record(db, {
+ main_pid = nil,
+ compactor_pid = nil,
+ instance_start_time, % number of microsecs since jan 1 1970 as a binary string
+ fd,
+ fd_monitor,
+ header = couch_db_header:new(),
+ committed_update_seq,
+ id_tree,
+ seq_tree,
+ local_tree,
+ update_seq,
+ name,
+ filepath,
+ validate_doc_funs = undefined,
+ security = [],
+ security_ptr = nil,
+ user_ctx = #user_ctx{},
+ waiting_delayed_commit = nil,
+ revs_limit = 1000,
+ fsync_options = [],
+ options = [],
+ compression,
+ before_doc_update = nil, % nil | fun(Doc, Db) -> NewDoc
+ after_doc_read = nil % nil | fun(Doc, Db) -> NewDoc
+}).
+
+
+-record(new_pse_db, {
+ vsn,
+ name,
+ filepath,
+
+ engine = {couch_bt_engine, undefined},
+
+ main_pid = nil,
+ compactor_pid = nil,
+
+ committed_update_seq,
+
+ instance_start_time, % number of microsecs since jan 1 1970 as a binary string
+
+ user_ctx = #user_ctx{},
+ security = [],
+ validate_doc_funs = undefined,
+
+ before_doc_update = nil, % nil | fun(Doc, Db) -> NewDoc
+ after_doc_read = nil, % nil | fun(Doc, Db) -> NewDoc
+
+ waiting_delayed_commit = nil,
+
+ options = [],
+ compression
+}).
+
+
+-define(NEW_PSE_DB, {
+ db,
+ _, % Version
+ _, % Name
+ _, % FilePath
+ _, % Engine
+ _, % MainPid
+ _, % CompactorPid
+ _, % CommittedUpdateSeq
+ _, % InstanceStartTime
+ _, % UserCtx
+ _, % Security
+ _, % ValidateDocFuns
+ _, % BeforeDocUpdate
+ _, % AfterDocRead
+ _, % WaitingDelayedCommit
+ _, % Options
+ _ % Compression
+}).
+
+
+-define(PSE_DB_NAME(Db), element(3, Db)).
+-define(PSE_DB_MAIN_PID(Db), element(6, Db)).
+-define(PSE_DB_USER_CTX(Db), element(10, Db)).
+-define(PSE_DB_SECURITY(Db), element(11, Db)).
diff --git a/src/couch/src/couch_db_plugin.erl b/src/couch/src/couch_db_plugin.erl
index 774e9e094..740b8121b 100644
--- a/src/couch/src/couch_db_plugin.erl
+++ b/src/couch/src/couch_db_plugin.erl
@@ -32,13 +32,15 @@
validate_dbname(DbName, Normalized, Default) ->
maybe_handle(validate_dbname, [DbName, Normalized], Default).
-before_doc_update(#db{before_doc_update = Fun} = Db, Doc0) ->
+before_doc_update(Db, Doc0) ->
+ Fun = couch_db:get_before_doc_update_fun(Db),
case with_pipe(before_doc_update, [Doc0, Db]) of
[Doc1, _Db] when is_function(Fun) -> Fun(Doc1, Db);
[Doc1, _Db] -> Doc1
end.
-after_doc_read(#db{after_doc_read = Fun} = Db, Doc0) ->
+after_doc_read(Db, Doc0) ->
+ Fun = couch_db:get_after_doc_read_fun(Db),
case with_pipe(after_doc_read, [Doc0, Db]) of
[Doc1, _Db] when is_function(Fun) -> Fun(Doc1, Db);
[Doc1, _Db] -> Doc1
diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl
index 78e0b8c19..ca61e04c6 100644
--- a/src/couch/src/couch_db_updater.erl
+++ b/src/couch/src/couch_db_updater.erl
@@ -20,6 +20,7 @@
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
-include_lib("couch/include/couch_db.hrl").
+-include("couch_db_int.hrl").
-define(IDLE_LIMIT_DEFAULT, 61000).
@@ -1079,14 +1080,13 @@ copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
{Body, AttInfos} = copy_doc_attachments(Db, Sp, DestFd),
% In the future, we should figure out how to do this for
% upgrade purposes.
- EJsonBody = case is_binary(Body) of
+ ExternalSize = case is_binary(Body) of
true ->
- couch_compress:decompress(Body);
+ couch_compress:uncompressed_size(Body);
false ->
- Body
+ ?term_size(Body)
end,
SummaryChunk = make_doc_summary(NewDb, {Body, AttInfos}),
- ExternalSize = ?term_size(EJsonBody),
{ok, Pos, SummarySize} = couch_file:append_raw_chunk(
DestFd, SummaryChunk),
AttSizes = [{element(3,A), element(4,A)} || A <- AttInfos],
@@ -1472,7 +1472,7 @@ get_meta_body_size(Meta, Summary) ->
{ejson_size, ExternalSize} ->
ExternalSize;
false ->
- ?term_size(couch_compress:decompress(Summary))
+ couch_compress:uncompressed_size(Summary)
end.
diff --git a/src/couch/src/couch_httpd_db.erl b/src/couch/src/couch_httpd_db.erl
index a6d83d619..34a1539aa 100644
--- a/src/couch/src/couch_httpd_db.erl
+++ b/src/couch/src/couch_httpd_db.erl
@@ -70,7 +70,8 @@ handle_changes_req(#httpd{method='GET'}=Req, Db, ChangesArgs, ChangesFun) ->
handle_changes_req(#httpd{}=Req, _Db, _ChangesArgs, _ChangesFun) ->
couch_httpd:send_method_not_allowed(Req, "GET,HEAD,POST").
-handle_changes_req1(Req, #db{name=DbName}=Db, ChangesArgs, ChangesFun) ->
+handle_changes_req1(Req, Db, ChangesArgs, ChangesFun) ->
+ DbName = couch_db:name(Db),
AuthDbName = ?l2b(config:get("couch_httpd_auth", "authentication_db")),
case AuthDbName of
DbName ->
@@ -287,7 +288,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_ensure_full_commit">>]}=Req, Db) -
RequiredSeq > CommittedSeq ->
couch_db:ensure_full_commit(Db);
true ->
- {ok, Db#db.instance_start_time}
+ {ok, couch_db:get_instance_start_time(Db)}
end
end,
send_json(Req, 201, {[
@@ -733,7 +734,8 @@ update_doc_result_to_json(DocId, Error) ->
update_doc(Req, Db, DocId, #doc{deleted=false}=Doc) ->
- Loc = absolute_uri(Req, "/" ++ couch_util:url_encode(Db#db.name) ++ "/" ++ couch_util:url_encode(DocId)),
+ DbName = couch_db:name(Db),
+ Loc = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName) ++ "/" ++ couch_util:url_encode(DocId)),
update_doc(Req, Db, DocId, Doc, [{"Location", Loc}]);
update_doc(Req, Db, DocId, Doc) ->
update_doc(Req, Db, DocId, Doc, []).
@@ -1037,7 +1039,7 @@ db_attachment_req(#httpd{method=Method,mochi_req=MochiReq}=Req, Db, DocId, FileN
[];
_ ->
[{"Location", absolute_uri(Req, "/" ++
- couch_util:url_encode(Db#db.name) ++ "/" ++
+ couch_util:url_encode(couch_db:name(Db)) ++ "/" ++
couch_util:url_encode(DocId) ++ "/" ++
couch_util:url_encode(FileName)
)}]
@@ -1149,7 +1151,7 @@ parse_changes_query(Req, Db) ->
{"descending", "true"} ->
Args#changes_args{dir=rev};
{"since", "now"} ->
- UpdateSeq = couch_util:with_db(Db#db.name, fun(WDb) ->
+ UpdateSeq = couch_util:with_db(couch_db:name(Db), fun(WDb) ->
couch_db:get_update_seq(WDb)
end),
Args#changes_args{since=UpdateSeq};
diff --git a/src/couch/src/couch_lru.erl b/src/couch/src/couch_lru.erl
index b58a623d6..023515e7c 100644
--- a/src/couch/src/couch_lru.erl
+++ b/src/couch/src/couch_lru.erl
@@ -13,7 +13,7 @@
-module(couch_lru).
-export([new/0, insert/2, update/2, close/1]).
--include_lib("couch/include/couch_db.hrl").
+-include("couch_server_int.hrl").
new() ->
{gb_trees:empty(), dict:new()}.
@@ -43,16 +43,17 @@ close({Tree, _} = Cache) ->
close_int(none, _) ->
false;
close_int({Lru, DbName, Iter}, {Tree, Dict} = Cache) ->
- case ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}) of
+ case ets:update_element(couch_dbs, DbName, {#entry.lock, locked}) of
true ->
- [#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName),
+ [#entry{db = Db, pid = Pid}] = ets:lookup(couch_dbs, DbName),
case couch_db:is_idle(Db) of true ->
true = ets:delete(couch_dbs, DbName),
true = ets:delete(couch_dbs_pid_to_name, Pid),
exit(Pid, kill),
{true, {gb_trees:delete(Lru, Tree), dict:erase(DbName, Dict)}};
false ->
- true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, nil}),
+ ElemSpec = {#entry.lock, unlocked},
+ true = ets:update_element(couch_dbs, DbName, ElemSpec),
couch_stats:increment_counter([couchdb, couch_server, lru_skip]),
close_int(gb_trees:next(Iter), update(DbName, Cache))
end;
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index 24016e05c..efcef714e 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -27,6 +27,7 @@
-export([handle_config_change/5, handle_config_terminate/3]).
-include_lib("couch/include/couch_db.hrl").
+-include("couch_server_int.hrl").
-define(MAX_DBS_OPEN, 500).
-define(RELISTEN_DELAY, 5000).
@@ -74,16 +75,18 @@ sup_start_link() ->
open(DbName, Options0) ->
Ctx = couch_util:get_value(user_ctx, Options0, #user_ctx{}),
case ets:lookup(couch_dbs, DbName) of
- [#db{fd=Fd, fd_monitor=Lock, options=Options} = Db] when Lock =/= locked ->
- update_lru(DbName, Options),
- {ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}};
+ [#entry{db = Db0, lock = Lock} = Entry] when Lock =/= locked ->
+ update_lru(DbName, Entry#entry.db_options),
+ {ok, Db1} = couch_db:incref(Db0),
+ couch_db:set_user_ctx(Db1, Ctx);
_ ->
Options = maybe_add_sys_db_callbacks(DbName, Options0),
Timeout = couch_util:get_value(timeout, Options, infinity),
Create = couch_util:get_value(create_if_missing, Options, false),
case gen_server:call(couch_server, {open, DbName, Options}, Timeout) of
- {ok, #db{fd=Fd} = Db} ->
- {ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}};
+ {ok, Db0} ->
+ {ok, Db1} = couch_db:incref(Db0),
+ couch_db:set_user_ctx(Db1, Ctx);
{not_found, no_db_file} when Create ->
couch_log:warning("creating missing database: ~s", [DbName]),
couch_server:create(DbName, Options);
@@ -104,9 +107,10 @@ close_lru() ->
create(DbName, Options0) ->
Options = maybe_add_sys_db_callbacks(DbName, Options0),
case gen_server:call(couch_server, {create, DbName, Options}, infinity) of
- {ok, #db{fd=Fd} = Db} ->
+ {ok, Db0} ->
Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
- {ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}};
+ {ok, Db1} = couch_db:incref(Db0),
+ couch_db:set_user_ctx(Db1, Ctx);
Error ->
Error
end.
@@ -176,9 +180,9 @@ hash_admin_passwords(Persist) ->
close_db_if_idle(DbName) ->
case ets:lookup(couch_dbs, DbName) of
- [#db{}] ->
+ [#entry{}] ->
gen_server:cast(couch_server, {close_db_if_idle, DbName});
- _ ->
+ [] ->
ok
end.
@@ -197,7 +201,7 @@ init([]) ->
ok = config:listen_for_changes(?MODULE, nil),
ok = couch_file:init_delete_dir(RootDir),
hash_admin_passwords(),
- ets:new(couch_dbs, [set, protected, named_table, {keypos, #db.name}]),
+ ets:new(couch_dbs, [set, protected, named_table, {keypos, #entry.name}]),
ets:new(couch_dbs_pid_to_name, [set, protected, named_table]),
process_flag(trap_exit, true),
{ok, #server{root_dir=RootDir,
@@ -209,8 +213,9 @@ terminate(Reason, Srv) ->
couch_log:error("couch_server terminating with ~p, state ~2048p",
[Reason,
Srv#server{lru = redacted}]),
- ets:foldl(fun(#db{main_pid=Pid}, _) -> couch_util:shutdown_sync(Pid) end,
- nil, couch_dbs),
+ ets:foldl(fun(#entry{db = Db}, _) ->
+ couch_util:shutdown_sync(couch_db:get_pid(Db))
+ end, nil, couch_dbs),
ok.
handle_config_change("couchdb", "database_dir", _, _, _) ->
@@ -316,15 +321,13 @@ open_async(Server, From, DbName, Filepath, Options) ->
true -> create;
false -> open
end,
- % icky hack of field values - compactor_pid used to store clients
- % and fd used for opening request info
- true = ets:insert(couch_dbs, #db{
+ true = ets:insert(couch_dbs, #entry{
name = DbName,
- fd = ReqType,
- main_pid = Opener,
- compactor_pid = [From],
- fd_monitor = locked,
- options = Options
+ pid = Opener,
+ lock = locked,
+ waiters = [From],
+ req_type = ReqType,
+ db_options = Options
}),
true = ets:insert(couch_dbs_pid_to_name, {Opener, DbName}),
db_opened(Server, Options).
@@ -348,16 +351,15 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) ->
true = ets:delete(couch_dbs_pid_to_name, FromPid),
OpenTime = timer:now_diff(os:timestamp(), T0) / 1000,
couch_stats:update_histogram([couchdb, db_open_time], OpenTime),
- % icky hack of field values - compactor_pid used to store clients
- % and fd used to possibly store a creation request
+ DbPid = couch_db:get_pid(Db),
case ets:lookup(couch_dbs, DbName) of
[] ->
% db was deleted during async open
- exit(Db#db.main_pid, kill),
+ exit(DbPid, kill),
{reply, ok, Server};
- [#db{fd=ReqType, compactor_pid=Froms}] ->
- link(Db#db.main_pid),
- [gen_server:reply(From, {ok, Db}) || From <- Froms],
+ [#entry{req_type = ReqType, waiters = Waiters} = Entry] ->
+ link(DbPid),
+ [gen_server:reply(Waiter, {ok, Db}) || Waiter <- Waiters],
% Cancel the creation request if it exists.
case ReqType of
{create, DbName, _Filepath, _Options, CrFrom} ->
@@ -365,8 +367,15 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) ->
_ ->
ok
end,
- true = ets:insert(couch_dbs, Db),
- true = ets:insert(couch_dbs_pid_to_name, {Db#db.main_pid, DbName}),
+ true = ets:insert(couch_dbs, #entry{
+ name = DbName,
+ db = Db,
+ pid = DbPid,
+ lock = unlocked,
+ db_options = Entry#entry.db_options,
+ start_time = couch_db:get_instance_start_time(Db)
+ }),
+ true = ets:insert(couch_dbs_pid_to_name, {DbPid, DbName}),
Lru = case couch_db:is_system_db(Db) of
false ->
couch_lru:insert(DbName, Server#server.lru);
@@ -378,13 +387,12 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) ->
handle_call({open_result, T0, DbName, {error, eexist}}, From, Server) ->
handle_call({open_result, T0, DbName, file_exists}, From, Server);
handle_call({open_result, _T0, DbName, Error}, {FromPid, _Tag}, Server) ->
- % icky hack of field values - compactor_pid used to store clients
case ets:lookup(couch_dbs, DbName) of
[] ->
% db was deleted during async open
{reply, ok, Server};
- [#db{fd=ReqType, compactor_pid=Froms}=Db] ->
- [gen_server:reply(From, Error) || From <- Froms],
+ [#entry{req_type = ReqType, waiters = Waiters} = Entry] ->
+ [gen_server:reply(Waiter, Error) || Waiter <- Waiters],
couch_log:info("open_result error ~p for ~s", [Error, DbName]),
true = ets:delete(couch_dbs, DbName),
true = ets:delete(couch_dbs_pid_to_name, FromPid),
@@ -394,7 +402,7 @@ handle_call({open_result, _T0, DbName, Error}, {FromPid, _Tag}, Server) ->
_ ->
Server
end,
- {reply, ok, db_closed(NewServer, Db#db.options)}
+ {reply, ok, db_closed(NewServer, Entry#entry.db_options)}
end;
handle_call({open, DbName, Options}, From, Server) ->
case ets:lookup(couch_dbs, DbName) of
@@ -412,15 +420,14 @@ handle_call({open, DbName, Options}, From, Server) ->
Error ->
{reply, Error, Server}
end;
- [#db{compactor_pid = Froms} = Db] when is_list(Froms) ->
- % icky hack of field values - compactor_pid used to store clients
- true = ets:insert(couch_dbs, Db#db{compactor_pid = [From|Froms]}),
- if length(Froms) =< 10 -> ok; true ->
+ [#entry{waiters = Waiters} = Entry] when is_list(Waiters) ->
+ true = ets:insert(couch_dbs, Entry#entry{waiters = [From | Waiters]}),
+ if length(Waiters) =< 10 -> ok; true ->
Fmt = "~b clients waiting to open db ~s",
- couch_log:info(Fmt, [length(Froms), DbName])
+ couch_log:info(Fmt, [length(Waiters), DbName])
end,
{noreply, Server};
- [#db{} = Db] ->
+ [#entry{db = Db}] ->
{reply, {ok, Db}, Server}
end;
handle_call({create, DbName, Options}, From, Server) ->
@@ -437,14 +444,13 @@ handle_call({create, DbName, Options}, From, Server) ->
CloseError ->
{reply, CloseError, Server}
end;
- [#db{fd=open}=Db] ->
+ [#entry{req_type = open} = Entry] ->
% We're trying to create a database while someone is in
% the middle of trying to open it. We allow one creator
% to wait while we figure out if it'll succeed.
- % icky hack of field values - fd used to store create request
CrOptions = [create | Options],
- NewDb = Db#db{fd={create, DbName, Filepath, CrOptions, From}},
- true = ets:insert(couch_dbs, NewDb),
+ Req = {create, DbName, Filepath, CrOptions, From},
+ true = ets:insert(couch_dbs, Entry#entry{req_type = Req}),
{noreply, Server};
[_AlreadyRunningDb] ->
{reply, file_exists, Server}
@@ -460,18 +466,17 @@ handle_call({delete, DbName, Options}, _From, Server) ->
Server2 =
case ets:lookup(couch_dbs, DbName) of
[] -> Server;
- [#db{main_pid=Pid, compactor_pid=Froms} = Db] when is_list(Froms) ->
- % icky hack of field values - compactor_pid used to store clients
+ [#entry{pid = Pid, waiters = Waiters} = Entry] when is_list(Waiters) ->
true = ets:delete(couch_dbs, DbName),
true = ets:delete(couch_dbs_pid_to_name, Pid),
exit(Pid, kill),
- [gen_server:reply(F, not_found) || F <- Froms],
- db_closed(Server, Db#db.options);
- [#db{main_pid=Pid} = Db] ->
+ [gen_server:reply(Waiter, not_found) || Waiter <- Waiters],
+ db_closed(Server, Entry#entry.db_options);
+ [#entry{pid = Pid} = Entry] ->
true = ets:delete(couch_dbs, DbName),
true = ets:delete(couch_dbs_pid_to_name, Pid),
exit(Pid, kill),
- db_closed(Server, Db#db.options)
+ db_closed(Server, Entry#entry.db_options)
end,
%% Delete any leftover compaction files. If we don't do this a
@@ -497,11 +502,12 @@ handle_call({delete, DbName, Options}, _From, Server) ->
Error ->
{reply, Error, Server}
end;
-handle_call({db_updated, #db{}=Db}, _From, Server0) ->
- #db{name = DbName, instance_start_time = StartTime} = Db,
- Server = try ets:lookup_element(couch_dbs, DbName, #db.instance_start_time) of
+handle_call({db_updated, Db}, _From, Server0) ->
+ DbName = couch_db:name(Db),
+ StartTime = couch_db:get_instance_start_time(Db),
+ Server = try ets:lookup_element(couch_dbs, DbName, #entry.start_time) of
StartTime ->
- true = ets:insert(couch_dbs, Db),
+ true = ets:update_element(couch_dbs, DbName, {#entry.db, Db}),
Lru = case couch_db:is_system_db(Db) of
false -> couch_lru:update(DbName, Server0#server.lru);
true -> Server0#server.lru
@@ -519,17 +525,19 @@ handle_cast({update_lru, DbName}, #server{lru = Lru, update_lru_on_read=true} =
handle_cast({update_lru, _DbName}, Server) ->
{noreply, Server};
handle_cast({close_db_if_idle, DbName}, Server) ->
- case ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}) of
+ case ets:update_element(couch_dbs, DbName, {#entry.lock, locked}) of
true ->
- [#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName),
+ [#entry{db = Db, db_options = DbOpts}] = ets:lookup(couch_dbs, DbName),
case couch_db:is_idle(Db) of
true ->
+ DbPid = couch_db:get_pid(Db),
true = ets:delete(couch_dbs, DbName),
- true = ets:delete(couch_dbs_pid_to_name, Pid),
- exit(Pid, kill),
- {noreply, db_closed(Server, Db#db.options)};
+ true = ets:delete(couch_dbs_pid_to_name, DbPid),
+ exit(DbPid, kill),
+ {noreply, db_closed(Server, DbOpts)};
false ->
- true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, nil}),
+ true = ets:update_element(
+ couch_dbs, DbName, {#entry.lock, unlocked}),
{noreply, Server}
end;
false ->
@@ -547,22 +555,19 @@ handle_info({'EXIT', _Pid, config_change}, Server) ->
handle_info({'EXIT', Pid, Reason}, Server) ->
case ets:lookup(couch_dbs_pid_to_name, Pid) of
[{Pid, DbName}] ->
- [#db{compactor_pid=Froms}=Db] = ets:lookup(couch_dbs, DbName),
+ [#entry{waiters = Waiters} = Entry] = ets:lookup(couch_dbs, DbName),
if Reason /= snappy_nif_not_loaded -> ok; true ->
Msg = io_lib:format("To open the database `~s`, Apache CouchDB "
"must be built with Erlang OTP R13B04 or higher.", [DbName]),
couch_log:error(Msg, [])
end,
couch_log:info("db ~s died with reason ~p", [DbName, Reason]),
- % icky hack of field values - compactor_pid used to store clients
- if is_list(Froms) ->
- [gen_server:reply(From, Reason) || From <- Froms];
- true ->
- ok
+ if not is_list(Waiters) -> ok; true ->
+ [gen_server:reply(Waiter, Reason) || Waiter <- Waiters]
end,
true = ets:delete(couch_dbs, DbName),
true = ets:delete(couch_dbs_pid_to_name, Pid),
- {noreply, db_closed(Server, Db#db.options)};
+ {noreply, db_closed(Server, Entry#entry.db_options)};
[] ->
{noreply, Server}
end;
diff --git a/src/couch/src/couch_server_int.hrl b/src/couch/src/couch_server_int.hrl
new file mode 100644
index 000000000..537a6abb9
--- /dev/null
+++ b/src/couch/src/couch_server_int.hrl
@@ -0,0 +1,23 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-record(entry, {
+ name,
+ db,
+ pid,
+ lock,
+ waiters,
+ req_type,
+ db_options,
+ start_time
+}).
diff --git a/src/couch/src/couch_users_db.erl b/src/couch/src/couch_users_db.erl
index 6f7b9af73..c7b41f1fc 100644
--- a/src/couch/src/couch_users_db.erl
+++ b/src/couch/src/couch_users_db.erl
@@ -39,8 +39,8 @@
% -> 404 // Not Found
% Else
% -> save_doc
-before_doc_update(Doc, #db{user_ctx = UserCtx} = Db) ->
- #user_ctx{name=Name} = UserCtx,
+before_doc_update(Doc, Db) ->
+ #user_ctx{name=Name} = couch_db:get_user_ctx(Db),
DocName = get_doc_name(Doc),
case (catch couch_db:check_is_admin(Db)) of
ok ->
@@ -108,8 +108,8 @@ after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, Db) ->
throw({forbidden,
<<"Only administrators can view design docs in the users database.">>})
end;
-after_doc_read(Doc, #db{user_ctx = UserCtx} = Db) ->
- #user_ctx{name=Name} = UserCtx,
+after_doc_read(Doc, Db) ->
+ #user_ctx{name=Name} = couch_db:get_user_ctx(Db),
DocName = get_doc_name(Doc),
case (catch couch_db:check_is_admin(Db)) of
ok ->
diff --git a/src/couch/src/couch_util.erl b/src/couch/src/couch_util.erl
index 4b848616d..42d10ec1e 100644
--- a/src/couch/src/couch_util.erl
+++ b/src/couch/src/couch_util.erl
@@ -199,7 +199,9 @@ json_apply_field({Key, NewValue}, [{OtherKey, OtherVal} | Headers], Acc) ->
json_apply_field({Key, NewValue}, [], Acc) ->
{[{Key, NewValue}|Acc]}.
-json_user_ctx(#db{name=ShardName, user_ctx=Ctx}) ->
+json_user_ctx(Db) ->
+ ShardName = couch_db:name(Db),
+ Ctx = couch_db:get_user_ctx(Db),
{[{<<"db">>, mem3:dbname(ShardName)},
{<<"name">>,Ctx#user_ctx.name},
{<<"roles">>,Ctx#user_ctx.roles}]}.
@@ -456,9 +458,7 @@ encode_doc_id(Id) ->
url_encode(Id).
-with_db(Db, Fun) when is_record(Db, db) ->
- Fun(Db);
-with_db(DbName, Fun) ->
+with_db(DbName, Fun) when is_binary(DbName) ->
case couch_db:open_int(DbName, [?ADMIN_CTX]) of
{ok, Db} ->
try
@@ -468,6 +468,13 @@ with_db(DbName, Fun) ->
end;
Else ->
throw(Else)
+ end;
+with_db(Db, Fun) ->
+ case couch_db:is_db(Db) of
+ true ->
+ Fun(Db);
+ false ->
+ erlang:error({invalid_db, Db})
end.
rfc1123_date() ->
diff --git a/src/couch/src/test_util.erl b/src/couch/src/test_util.erl
index e652dd9b3..8a05e8830 100644
--- a/src/couch/src/test_util.erl
+++ b/src/couch/src/test_util.erl
@@ -13,6 +13,8 @@
-module(test_util).
-include_lib("couch/include/couch_eunit.hrl").
+-include("couch_db.hrl").
+-include("couch_db_int.hrl").
-export([init_code_path/0]).
-export([source_file/1, build_file/1]).
@@ -32,6 +34,8 @@
-export([start/1, start/2, start/3, stop/1]).
+-export([fake_db/1]).
+
-record(test_context, {mocked = [], started = [], module}).
-define(DEFAULT_APPS,
@@ -230,6 +234,16 @@ stop(#test_context{mocked = Mocked, started = Apps}) ->
meck:unload(Mocked),
stop_applications(Apps).
+fake_db(Fields) ->
+ Indexes = lists:zip(
+ record_info(fields, db),
+ lists:seq(2, record_info(size, db))
+ ),
+ lists:foldl(fun({FieldName, Value}, Acc) ->
+ Idx = couch_util:get_value(FieldName, Indexes),
+ setelement(Idx, Acc, Value)
+ end, #db{}, Fields).
+
now_us() ->
{MegaSecs, Secs, MicroSecs} = now(),
(MegaSecs * 1000000 + Secs) * 1000000 + MicroSecs.
diff --git a/src/couch/test/couch_auth_cache_tests.erl b/src/couch/test/couch_auth_cache_tests.erl
index 6328c9b97..6916045c3 100644
--- a/src/couch/test/couch_auth_cache_tests.erl
+++ b/src/couch/test/couch_auth_cache_tests.erl
@@ -276,7 +276,7 @@ hash_password(Password) ->
shutdown_db(DbName) ->
{ok, AuthDb} = couch_db:open_int(DbName, [?ADMIN_CTX]),
ok = couch_db:close(AuthDb),
- couch_util:shutdown_sync(AuthDb#db.main_pid),
+ couch_util:shutdown_sync(couch_db:get_pid(AuthDb)),
ok = timer:sleep(1000).
get_doc_rev(DbName, UserName) ->
diff --git a/src/couch/test/couch_changes_tests.erl b/src/couch/test/couch_changes_tests.erl
index b2da3fea4..673f2faad 100644
--- a/src/couch/test/couch_changes_tests.erl
+++ b/src/couch/test/couch_changes_tests.erl
@@ -652,7 +652,7 @@ should_filter_by_user_ctx({DbName, _}) ->
]}),
ChArgs = #changes_args{filter = "app/valid"},
UserCtx = #user_ctx{name = <<"doc3">>, roles = []},
- DbRec = #db{name = DbName, user_ctx = UserCtx},
+ {ok, DbRec} = couch_db:clustered_db(DbName, UserCtx),
Req = {json_req, {[{
<<"userCtx">>, couch_util:json_user_ctx(DbRec)
}]}},
diff --git a/src/couch/test/couch_compress_tests.erl b/src/couch/test/couch_compress_tests.erl
index 6d6e6a792..addb9a0e2 100644
--- a/src/couch/test/couch_compress_tests.erl
+++ b/src/couch/test/couch_compress_tests.erl
@@ -72,3 +72,14 @@ is_compressed_test_() ->
?_assertError(invalid_compression,
couch_compress:is_compressed(?CORRUPT, snappy))
].
+
+uncompressed_size_test_() ->
+ [
+ ?_assertEqual(49, couch_compress:uncompressed_size(?NONE)),
+ ?_assertEqual(49, couch_compress:uncompressed_size(?DEFLATE)),
+ ?_assertEqual(49, couch_compress:uncompressed_size(?SNAPPY)),
+ ?_assertEqual(5, couch_compress:uncompressed_size(
+ couch_compress:compress(x, {deflate, 9}))),
+ ?_assertError(invalid_compression,
+ couch_compress:uncompressed_size(?CORRUPT))
+ ].
diff --git a/src/couch/test/couch_db_plugin_tests.erl b/src/couch/test/couch_db_plugin_tests.erl
index ea9b230b1..94dd3dfa5 100644
--- a/src/couch/test/couch_db_plugin_tests.erl
+++ b/src/couch/test/couch_db_plugin_tests.erl
@@ -43,6 +43,7 @@ data_providers() -> [].
data_subscriptions() -> [].
processes() -> [].
notify(_, _, _) -> ok.
+fake_db() -> element(2, couch_db:clustered_db(fake, totes_fake)).
setup() ->
couch_tests:setup([
@@ -133,33 +134,33 @@ validate_dbname_pass() ->
before_doc_update_match() ->
?assertMatch(
{true, [before_doc_update, doc]},
- couch_db_plugin:before_doc_update(#db{}, {true, [doc]})).
+ couch_db_plugin:before_doc_update(fake_db(), {true, [doc]})).
before_doc_update_no_match() ->
?assertMatch(
{false, [doc]},
- couch_db_plugin:before_doc_update(#db{}, {false, [doc]})).
+ couch_db_plugin:before_doc_update(fake_db(), {false, [doc]})).
before_doc_update_throw() ->
?assertThrow(
before_doc_update,
- couch_db_plugin:before_doc_update(#db{}, {fail, [doc]})).
+ couch_db_plugin:before_doc_update(fake_db(), {fail, [doc]})).
after_doc_read_match() ->
?assertMatch(
{true, [after_doc_read, doc]},
- couch_db_plugin:after_doc_read(#db{}, {true, [doc]})).
+ couch_db_plugin:after_doc_read(fake_db(), {true, [doc]})).
after_doc_read_no_match() ->
?assertMatch(
{false, [doc]},
- couch_db_plugin:after_doc_read(#db{}, {false, [doc]})).
+ couch_db_plugin:after_doc_read(fake_db(), {false, [doc]})).
after_doc_read_throw() ->
?assertThrow(
after_doc_read,
- couch_db_plugin:after_doc_read(#db{}, {fail, [doc]})).
+ couch_db_plugin:after_doc_read(fake_db(), {fail, [doc]})).
validate_docid_match() ->
diff --git a/src/couch/test/couch_server_tests.erl b/src/couch/test/couch_server_tests.erl
index c8f8381d7..c52b3f6b0 100644
--- a/src/couch/test/couch_server_tests.erl
+++ b/src/couch/test/couch_server_tests.erl
@@ -32,8 +32,9 @@ setup(_) ->
setup().
teardown(Db) ->
+ FilePath = couch_db:get_filepath(Db),
(catch couch_db:close(Db)),
- (catch file:delete(Db#db.filepath)).
+ (catch file:delete(FilePath)).
teardown(rename, Db) ->
config:set("couchdb", "enable_database_recovery", "false", false),
@@ -61,7 +62,9 @@ make_test_case(Mod, Funs) ->
{foreachx, fun setup/1, fun teardown/2, [{Mod, Fun} || Fun <- Funs]}
}.
-should_rename_on_delete(_, #db{filepath = Origin, name = DbName}) ->
+should_rename_on_delete(_, Db) ->
+ DbName = couch_db:name(Db),
+ Origin = couch_db:get_filepath(Db),
?_test(begin
?assert(filelib:is_regular(Origin)),
?assertMatch(ok, couch_server:delete(DbName, [])),
@@ -74,7 +77,9 @@ should_rename_on_delete(_, #db{filepath = Origin, name = DbName}) ->
?assert(filelib:is_regular(Renamed))
end).
-should_delete(_, #db{filepath = Origin, name = DbName}) ->
+should_delete(_, Db) ->
+ DbName = couch_db:name(Db),
+ Origin = couch_db:get_filepath(Db),
?_test(begin
?assert(filelib:is_regular(Origin)),
?assertMatch(ok, couch_server:delete(DbName, [])),
diff --git a/src/couch/test/couchdb_compaction_daemon_tests.erl b/src/couch/test/couchdb_compaction_daemon_tests.erl
index 856a53d05..0d7a46862 100644
--- a/src/couch/test/couchdb_compaction_daemon_tests.erl
+++ b/src/couch/test/couchdb_compaction_daemon_tests.erl
@@ -175,7 +175,7 @@ update(DbName) ->
lists:foreach(fun(_) ->
Doc = couch_doc:from_json_obj({[{<<"_id">>, couch_uuids:new()}]}),
{ok, _} = couch_db:update_docs(Db, [Doc]),
- query_view(Db#db.name)
+ query_view(couch_db:name(Db))
end, lists:seq(1, 200)),
couch_db:close(Db).
@@ -213,7 +213,7 @@ spawn_compaction_monitor(DbName) ->
{Pid, Ref} = spawn_monitor(fun() ->
DaemonPid = whereis(couch_compaction_daemon),
DbPid = couch_util:with_db(DbName, fun(Db) ->
- Db#db.main_pid
+ couch_db:get_pid(Db)
end),
{ok, ViewPid} = couch_index_server:get_index(couch_mrview_index,
DbName, <<"_design/foo">>),
diff --git a/src/couch/test/couchdb_file_compression_tests.erl b/src/couch/test/couchdb_file_compression_tests.erl
index 09fead582..8f0fe5bf1 100644
--- a/src/couch/test/couchdb_file_compression_tests.erl
+++ b/src/couch/test/couchdb_file_compression_tests.erl
@@ -157,7 +157,7 @@ compare_compression_methods(DbName) ->
?assert(DbSizeDeflate1 > DbSizeDeflate9),
?assert(ViewSizeDeflate1 > ViewSizeDeflate9),
- ?assert(ExternalSizePreCompact =:= ExternalSizeNone),
+ ?assert(ExternalSizePreCompact >= ExternalSizeNone),
?assert(ExternalSizeNone =:= ExternalSizeSnappy),
?assert(ExternalSizeNone =:= ExternalSizeDeflate9),
?assert(ViewExternalSizeNone =:= ViewExternalSizeSnappy),
diff --git a/src/couch/test/couchdb_views_tests.erl b/src/couch/test/couchdb_views_tests.erl
index ae4029513..c0505f3db 100644
--- a/src/couch/test/couchdb_views_tests.erl
+++ b/src/couch/test/couchdb_views_tests.erl
@@ -348,11 +348,11 @@ couchdb_1283() ->
]}),
{ok, _} = couch_db:update_doc(MDb1, DDoc, []),
ok = populate_db(MDb1, 100, 100),
- query_view(MDb1#db.name, "foo", "foo"),
+ query_view(couch_db:name(MDb1), "foo", "foo"),
ok = couch_db:close(MDb1),
{ok, Pid} = couch_index_server:get_index(
- couch_mrview_index, MDb1#db.name, <<"_design/foo">>),
+ couch_mrview_index, couch_db:name(MDb1), <<"_design/foo">>),
% Start and pause compacton
WaitRef = erlang:make_ref(),
@@ -522,7 +522,8 @@ view_cleanup(DbName) ->
count_users(DbName) ->
{ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
- {monitored_by, Monitors} = erlang:process_info(Db#db.main_pid, monitored_by),
+ DbPid = couch_db:get_pid(Db),
+ {monitored_by, Monitors} = process_info(DbPid, monitored_by),
CouchFiles = [P || P <- Monitors, couch_file:process_info(P) =/= undefined],
ok = couch_db:close(Db),
length(lists:usort(Monitors) -- [self() | CouchFiles]).
@@ -546,9 +547,10 @@ backup_db_file(DbName) ->
restore_backup_db_file(DbName) ->
DbDir = config:get("couchdb", "database_dir"),
- {ok, #db{main_pid = UpdaterPid} = Db} = couch_db:open_int(DbName, []),
+ {ok, Db} = couch_db:open_int(DbName, []),
ok = couch_db:close(Db),
- exit(UpdaterPid, shutdown),
+ DbPid = couch_db:get_pid(Db),
+ exit(DbPid, shutdown),
DbFile = filename:join([DbDir, ?b2l(DbName) ++ ".couch"]),
ok = file:delete(DbFile),
@@ -556,9 +558,13 @@ restore_backup_db_file(DbName) ->
test_util:wait(fun() ->
case couch_server:open(DbName, [{timeout, ?TIMEOUT}]) of
- {ok, #db{main_pid = UpdaterPid}} -> wait;
- {ok, _} -> ok;
- Else -> Else
+ {ok, WaitDb} ->
+ case couch_db:get_pid(WaitDb) == DbPid of
+ true -> wait;
+ false -> ok
+ end;
+ Else ->
+ Else
end
end, ?TIMEOUT, ?DELAY).
@@ -576,7 +582,8 @@ wait_db_compact_done(_DbName, 0) ->
wait_db_compact_done(DbName, N) ->
{ok, Db} = couch_db:open_int(DbName, []),
ok = couch_db:close(Db),
- case is_pid(Db#db.compactor_pid) of
+ CompactorPid = couch_db:get_compactor_pid(Db),
+ case is_pid(CompactorPid) of
false ->
ok;
true ->
diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl
index 8225a90a3..a33c1e490 100644
--- a/src/couch_index/src/couch_index_server.erl
+++ b/src/couch_index/src/couch_index_server.erl
@@ -60,11 +60,9 @@ validate(DbName, DDoc) ->
lists:foreach(ValidateFun, EnabledIndexers).
-get_index(Module, #db{name = <<"shards/", _/binary>> = DbName}, DDoc) ->
- case is_record(DDoc, doc) of
- true -> get_index(Module, DbName, DDoc, nil);
- false -> get_index(Module, DbName, DDoc)
- end;
+get_index(Module, <<"shards/", _/binary>> = DbName, DDoc)
+ when is_record(DDoc, doc) ->
+ get_index(Module, DbName, DDoc, nil);
get_index(Module, <<"shards/", _/binary>> = DbName, DDoc) ->
{Pid, Ref} = spawn_monitor(fun() ->
exit(fabric:open_doc(mem3:dbname(DbName), DDoc, [ejson_body, ?ADMIN_CTX]))
@@ -77,9 +75,10 @@ get_index(Module, <<"shards/", _/binary>> = DbName, DDoc) ->
erlang:demonitor(Ref, [flush]),
{error, timeout}
end;
-
-get_index(Module, DbName, DDoc) ->
- get_index(Module, DbName, DDoc, nil).
+get_index(Module, DbName, DDoc) when is_binary(DbName) ->
+ get_index(Module, DbName, DDoc, nil);
+get_index(Module, Db, DDoc) ->
+ get_index(Module, couch_db:name(Db), DDoc).
get_index(Module, DbName, DDoc, Fun) when is_binary(DbName) ->
diff --git a/src/couch_index/src/couch_index_util.erl b/src/couch_index/src/couch_index_util.erl
index 5694641ca..dcb33b5b0 100644
--- a/src/couch_index/src/couch_index_util.erl
+++ b/src/couch_index/src/couch_index_util.erl
@@ -25,7 +25,7 @@ root_dir() ->
index_dir(Module, DbName) when is_binary(DbName) ->
DbDir = "." ++ binary_to_list(DbName) ++ "_design",
filename:join([root_dir(), DbDir, Module]);
-index_dir(Module, #db{}=Db) ->
+index_dir(Module, Db) ->
index_dir(Module, couch_db:name(Db)).
diff --git a/src/couch_index/test/couch_index_compaction_tests.erl b/src/couch_index/test/couch_index_compaction_tests.erl
index 0048b338e..062be872a 100644
--- a/src/couch_index/test/couch_index_compaction_tests.erl
+++ b/src/couch_index/test/couch_index_compaction_tests.erl
@@ -25,7 +25,8 @@ setup() ->
?assertNot(is_opened(Db)),
{Db, IndexerPid}.
-fake_index(#db{name = DbName} = Db) ->
+fake_index(Db) ->
+ DbName = couch_db:name(Db),
ok = meck:new([test_index], [non_strict]),
ok = meck:expect(test_index, init, ['_', '_'], {ok, 10}),
ok = meck:expect(test_index, open, fun(_Db, State) ->
diff --git a/src/couch_index/test/couch_index_ddoc_updated_tests.erl b/src/couch_index/test/couch_index_ddoc_updated_tests.erl
index f42c9a29a..d1bbc43d2 100644
--- a/src/couch_index/test/couch_index_ddoc_updated_tests.erl
+++ b/src/couch_index/test/couch_index_ddoc_updated_tests.erl
@@ -91,7 +91,7 @@ check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) ->
% assert that all index processes exit after ddoc updated
ok = meck:reset(test_index),
couch_index_server:handle_db_event(
- DbShard#db.name, {ddoc_updated, DDocID}, {st, ""}),
+ couch_db:name(DbShard), {ddoc_updated, DDocID}, {st, ""}),
ok = meck:wait(N, test_index, init, ['_', '_'], 5000),
IndexesAfter = get_indexes_by_ddoc(DDocID, 0),
diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl
index 11c209b43..07e36687d 100644
--- a/src/couch_mrview/src/couch_mrview.erl
+++ b/src/couch_mrview/src/couch_mrview.erl
@@ -364,15 +364,12 @@ get_view_info(Db, DDoc, VName) ->
%% @doc refresh a view index
-refresh(#db{name=DbName}, DDoc) ->
- refresh(DbName, DDoc);
-
-refresh(Db, DDoc) ->
- UpdateSeq = couch_util:with_db(Db, fun(WDb) ->
+refresh(DbName, DDoc) when is_binary(DbName)->
+ UpdateSeq = couch_util:with_db(DbName, fun(WDb) ->
couch_db:get_update_seq(WDb)
end),
- case couch_index_server:get_index(couch_mrview_index, Db, DDoc) of
+ case couch_index_server:get_index(couch_mrview_index, DbName, DDoc) of
{ok, Pid} ->
case catch couch_index:get_state(Pid, UpdateSeq) of
{ok, _} -> ok;
@@ -380,7 +377,10 @@ refresh(Db, DDoc) ->
end;
Error ->
{error, Error}
- end.
+ end;
+
+refresh(Db, DDoc) ->
+ refresh(couch_db:name(Db), DDoc).
compact(Db, DDoc) ->
compact(Db, DDoc, []).
@@ -668,7 +668,7 @@ get_reduce_fun(#mrargs{extra = Extra}) ->
end.
-get_total_rows(#db{} = Db, #mrargs{extra = Extra}) ->
+get_total_rows(Db, #mrargs{extra = Extra}) ->
case couch_util:get_value(namespace, Extra) of
<<"_local">> ->
null;
@@ -678,7 +678,7 @@ get_total_rows(#db{} = Db, #mrargs{extra = Extra}) ->
end.
-get_update_seq(#db{} = Db, #mrargs{extra = Extra}) ->
+get_update_seq(Db, #mrargs{extra = Extra}) ->
case couch_util:get_value(namespace, Extra) of
<<"_local">> ->
null;
diff --git a/src/couch_mrview/src/couch_mrview_compactor.erl b/src/couch_mrview/src/couch_mrview_compactor.erl
index c1b2fbc21..e9be89c71 100644
--- a/src/couch_mrview/src/couch_mrview_compactor.erl
+++ b/src/couch_mrview/src/couch_mrview_compactor.erl
@@ -53,8 +53,7 @@ compact(State) ->
{ok, Fd} = couch_mrview_util:open_file(CompactFName),
ESt = couch_mrview_util:reset_index(Db, Fd, State),
- {ok, DbReduce} = couch_btree:full_reduce(Db#db.id_tree),
- Count = element(1, DbReduce),
+ {ok, Count} = couch_db:get_doc_count(Db),
{ESt, Count}
end),
diff --git a/src/couch_mrview/src/couch_mrview_http.erl b/src/couch_mrview/src/couch_mrview_http.erl
index a94f48df9..9ad50eeef 100644
--- a/src/couch_mrview/src/couch_mrview_http.erl
+++ b/src/couch_mrview/src/couch_mrview_http.erl
@@ -103,11 +103,11 @@ handle_view_changes_req(#httpd{path_parts=[_,<<"_design">>,DDocName,<<"_view_cha
handle_view_req(#httpd{method='GET',
path_parts=[_, _, DDocName, _, VName, <<"_info">>]}=Req,
Db, _DDoc) ->
-
+ DbName = couch_db:name(Db),
DDocId = <<"_design/", DDocName/binary >>,
- {ok, Info} = couch_mrview:get_view_info(Db#db.name, DDocId, VName),
+ {ok, Info} = couch_mrview:get_view_info(DbName, DDocId, VName),
- FinalInfo = [{db_name, Db#db.name},
+ FinalInfo = [{db_name, DbName},
{ddoc, DDocId},
{view, VName}] ++ Info,
chttpd:send_json(Req, 200, {FinalInfo});
@@ -212,7 +212,7 @@ is_restricted(Db, _) ->
couch_db:is_system_db(Db).
is_public_fields_configured(Db) ->
- DbName = ?b2l(Db#db.name),
+ DbName = ?b2l(couch_db:name(Db)),
case config:get("couch_httpd_auth", "authentication_db", "_users") of
DbName ->
UsersDbPublic = config:get("couch_httpd_auth", "users_db_public", "false"),
@@ -237,7 +237,7 @@ do_all_docs_req(Req, Db, Keys, NS) ->
{ok, Resp} = couch_httpd:etag_maybe(Req, fun() ->
Max = chttpd:chunked_response_buffer_size(),
VAcc0 = #vacc{db=Db, req=Req, threshold=Max},
- DbName = ?b2l(Db#db.name),
+ DbName = ?b2l(couch_db:name(Db)),
UsersDbName = config:get("couch_httpd_auth",
"authentication_db",
"_users"),
diff --git a/src/couch_mrview/src/couch_mrview_show.erl b/src/couch_mrview/src/couch_mrview_show.erl
index 3a602ad21..2411c2ca2 100644
--- a/src/couch_mrview/src/couch_mrview_show.erl
+++ b/src/couch_mrview/src/couch_mrview_show.erl
@@ -364,13 +364,17 @@ json_apply_field({Key, NewValue}, [], Acc) ->
% This loads the db info if we have a fully loaded db record, but we might not
% have the db locally on this node, so then load the info through fabric.
-json_req_obj(Req, #db{main_pid=Pid}=Db) when is_pid(Pid) ->
- chttpd_external:json_req_obj(Req, Db);
json_req_obj(Req, Db) ->
- % use a separate process because we're already in a receive loop, and
- % json_req_obj calls fabric:get_db_info()
- spawn_monitor(fun() -> exit(chttpd_external:json_req_obj(Req, Db)) end),
- receive {'DOWN', _, _, _, JsonReq} -> JsonReq end.
+ case couch_db:is_clustered(Db) of
+ true ->
+ % use a separate process because we're already in a receive loop,
+ % and json_req_obj calls fabric:get_db_info()
+ JRO = fun() -> exit(chttpd_external:json_req_obj(Req, Db)) end,
+ spawn_monitor(JRO),
+ receive {'DOWN', _, _, _, JsonReq} -> JsonReq end;
+ false ->
+ chttpd_external:json_req_obj(Req, Db)
+ end.
last_chunk(Req, undefined) ->
chttpd:send_response(Req, 200, [], <<"">>);
diff --git a/src/couch_mrview/test/couch_mrview_all_docs_tests.erl b/src/couch_mrview/test/couch_mrview_all_docs_tests.erl
index 5e352797f..bf8eb7e5b 100644
--- a/src/couch_mrview/test/couch_mrview_all_docs_tests.erl
+++ b/src/couch_mrview/test/couch_mrview_all_docs_tests.erl
@@ -25,7 +25,7 @@ setup() ->
teardown(Db) ->
couch_db:close(Db),
- couch_server:delete(Db#db.name, [?ADMIN_CTX]),
+ couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]),
ok.
diff --git a/src/couch_mrview/test/couch_mrview_changes_since_tests.erl b/src/couch_mrview/test/couch_mrview_changes_since_tests.erl
index 1e31b3968..d670e109b 100644
--- a/src/couch_mrview/test/couch_mrview_changes_since_tests.erl
+++ b/src/couch_mrview/test/couch_mrview_changes_since_tests.erl
@@ -19,7 +19,7 @@
teardown(Db) ->
couch_db:close(Db),
- couch_server:delete(Db#db.name, [?ADMIN_CTX]),
+ couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]),
ok.
changes_since_basic_test_() ->
diff --git a/src/couch_mrview/test/couch_mrview_collation_tests.erl b/src/couch_mrview/test/couch_mrview_collation_tests.erl
index c4a714d1e..5c8cb54b1 100644
--- a/src/couch_mrview/test/couch_mrview_collation_tests.erl
+++ b/src/couch_mrview/test/couch_mrview_collation_tests.erl
@@ -64,7 +64,7 @@ setup() ->
teardown(Db) ->
couch_db:close(Db),
- couch_server:delete(Db#db.name, [?ADMIN_CTX]),
+ couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]),
ok.
diff --git a/src/couch_mrview/test/couch_mrview_compact_tests.erl b/src/couch_mrview/test/couch_mrview_compact_tests.erl
index 40877c80e..7664becdc 100644
--- a/src/couch_mrview/test/couch_mrview_compact_tests.erl
+++ b/src/couch_mrview/test/couch_mrview_compact_tests.erl
@@ -26,7 +26,7 @@ setup() ->
teardown(Db) ->
meck:unload(),
couch_db:close(Db),
- couch_server:delete(Db#db.name, [?ADMIN_CTX]),
+ couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]),
ok.
diff --git a/src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl b/src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl
index cc3844dbd..4310157eb 100644
--- a/src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl
+++ b/src/couch_mrview/test/couch_mrview_ddoc_updated_tests.erl
@@ -60,7 +60,7 @@ setup() ->
teardown(Db) ->
meck:unload(couch_index_updater),
couch_db:close(Db),
- couch_server:delete(Db#db.name, [?ADMIN_CTX]),
+ couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]),
ok.
diff --git a/src/couch_mrview/test/couch_mrview_ddoc_validation_tests.erl b/src/couch_mrview/test/couch_mrview_ddoc_validation_tests.erl
index c2038ddfb..ce2be8904 100644
--- a/src/couch_mrview/test/couch_mrview_ddoc_validation_tests.erl
+++ b/src/couch_mrview/test/couch_mrview_ddoc_validation_tests.erl
@@ -23,7 +23,7 @@ setup() ->
teardown(Db) ->
couch_db:close(Db),
- couch_server:delete(Db#db.name, [?ADMIN_CTX]),
+ couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]),
ok.
ddoc_validation_test_() ->
diff --git a/src/couch_mrview/test/couch_mrview_index_changes_tests.erl b/src/couch_mrview/test/couch_mrview_index_changes_tests.erl
index 8f0c296aa..2701e0c22 100644
--- a/src/couch_mrview/test/couch_mrview_index_changes_tests.erl
+++ b/src/couch_mrview/test/couch_mrview_index_changes_tests.erl
@@ -22,7 +22,7 @@ setup() ->
teardown(Db) ->
couch_db:close(Db),
- couch_server:delete(Db#db.name, [?ADMIN_CTX]),
+ couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]),
ok.
changes_index_test() ->
diff --git a/src/couch_mrview/test/couch_mrview_index_info_tests.erl b/src/couch_mrview/test/couch_mrview_index_info_tests.erl
index 3f88972ea..c994df9d3 100644
--- a/src/couch_mrview/test/couch_mrview_index_info_tests.erl
+++ b/src/couch_mrview/test/couch_mrview_index_info_tests.erl
@@ -28,7 +28,7 @@ setup() ->
teardown({Db, _}) ->
couch_db:close(Db),
- couch_server:delete(Db#db.name, [?ADMIN_CTX]),
+ couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]),
ok.
diff --git a/src/couch_mrview/test/couch_mrview_local_docs_tests.erl b/src/couch_mrview/test/couch_mrview_local_docs_tests.erl
index f18f66e4e..c96b98875 100644
--- a/src/couch_mrview/test/couch_mrview_local_docs_tests.erl
+++ b/src/couch_mrview/test/couch_mrview_local_docs_tests.erl
@@ -25,7 +25,7 @@ setup() ->
teardown(Db) ->
couch_db:close(Db),
- couch_server:delete(Db#db.name, [?ADMIN_CTX]),
+ couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]),
ok.
diff --git a/src/couch_mrview/test/couch_mrview_map_views_tests.erl b/src/couch_mrview/test/couch_mrview_map_views_tests.erl
index 3a199288d..229af183d 100644
--- a/src/couch_mrview/test/couch_mrview_map_views_tests.erl
+++ b/src/couch_mrview/test/couch_mrview_map_views_tests.erl
@@ -24,7 +24,7 @@ setup() ->
teardown(Db) ->
couch_db:close(Db),
- couch_server:delete(Db#db.name, [?ADMIN_CTX]),
+ couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]),
ok.
diff --git a/src/couch_mrview/test/couch_mrview_red_views_tests.erl b/src/couch_mrview/test/couch_mrview_red_views_tests.erl
index 310078597..b83686113 100644
--- a/src/couch_mrview/test/couch_mrview_red_views_tests.erl
+++ b/src/couch_mrview/test/couch_mrview_red_views_tests.erl
@@ -24,7 +24,7 @@ setup() ->
teardown(Db) ->
couch_db:close(Db),
- couch_server:delete(Db#db.name, [?ADMIN_CTX]),
+ couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]),
ok.
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index a2ef60fa3..ab8eb7f29 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -60,11 +60,11 @@
db_uri(#httpdb{url = Url}) ->
couch_util:url_strip_password(Url);
-db_uri(#db{name = Name}) ->
- db_uri(Name);
+db_uri(DbName) when is_binary(DbName) ->
+ ?b2l(DbName);
-db_uri(DbName) ->
- ?b2l(DbName).
+db_uri(Db) ->
+ db_uri(couch_db:name(Db)).
db_open(Db, Options) ->
@@ -153,10 +153,12 @@ get_db_info(#httpdb{} = Db) ->
fun(200, _, {Props}) ->
{ok, Props}
end);
-get_db_info(#db{name = DbName, user_ctx = UserCtx}) ->
- {ok, Db} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
- {ok, Info} = couch_db:get_db_info(Db),
- couch_db:close(Db),
+get_db_info(Db) ->
+ DbName = couch_db:name(Db),
+ UserCtx = couch_db:get_user_ctx(Db),
+ {ok, InfoDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
+ {ok, Info} = couch_db:get_db_info(InfoDb),
+ couch_db:close(InfoDb),
{ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}.
@@ -176,8 +178,10 @@ get_pending_count(#httpdb{} = Db, Seq) ->
send_req(Db, Options, fun(200, _, {Props}) ->
{ok, couch_util:get_value(<<"pending">>, Props, null)}
end);
-get_pending_count(#db{name=DbName}=Db, Seq) when is_number(Seq) ->
- {ok, CountDb} = couch_db:open(DbName, [{user_ctx, Db#db.user_ctx}]),
+get_pending_count(Db, Seq) when is_number(Seq) ->
+ DbName = couch_db:name(Db),
+ UserCtx = couch_db:get_user_ctx(Db),
+ {ok, CountDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
Pending = couch_db:count_changes_since(CountDb, Seq),
couch_db:close(CountDb),
{ok, Pending}.
@@ -189,7 +193,8 @@ get_view_info(#httpdb{} = Db, DDocId, ViewName) ->
{VInfo} = couch_util:get_value(<<"view_index">>, Props, {[]}),
{ok, VInfo}
end);
-get_view_info(#db{name = DbName}, DDocId, ViewName) ->
+get_view_info(Db, DDocId, ViewName) ->
+ DbName = couch_db:name(Db),
{ok, VInfo} = couch_mrview:get_view_info(DbName, DDocId, ViewName),
{ok, [{couch_util:to_binary(K), V} || {K, V} <- VInfo]}.
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.hrl b/src/couch_replicator/src/couch_replicator_api_wrap.hrl
index fc940545a..d2e0fdff5 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.hrl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.hrl
@@ -21,7 +21,7 @@
],
timeout, % milliseconds
ibrowse_options = [],
- retries = 10,
+ retries = 5,
wait = 250, % milliseconds
httpc_pool = nil,
http_connections,
diff --git a/src/couch_replicator/src/couch_replicator_clustering.erl b/src/couch_replicator/src/couch_replicator_clustering.erl
index 7618f24d6..ed01465d5 100644
--- a/src/couch_replicator/src/couch_replicator_clustering.erl
+++ b/src/couch_replicator/src/couch_replicator_clustering.erl
@@ -28,6 +28,7 @@
-behaviour(gen_server).
-behaviour(config_listener).
+-behaviour(mem3_cluster).
-export([
start_link/0
@@ -55,6 +56,12 @@
handle_config_terminate/3
]).
+% mem3_cluster callbacks
+-export([
+ cluster_stable/1,
+ cluster_unstable/1
+]).
+
-include_lib("couch/include/couch_db.hrl").
-include_lib("mem3/include/mem3.hrl").
@@ -63,11 +70,8 @@
-define(RELISTEN_DELAY, 5000).
-record(state, {
- start_time :: erlang:timestamp(),
- last_change :: erlang:timestamp(),
- period = ?DEFAULT_QUIET_PERIOD :: non_neg_integer(),
- start_period = ?DEFAULT_START_PERIOD :: non_neg_integer(),
- timer :: reference()
+ mem3_cluster_pid :: pid(),
+ cluster_stable :: boolean()
}).
@@ -115,64 +119,55 @@ link_cluster_event_listener(Mod, Fun, Args)
Pid.
+% Mem3 cluster callbacks
+
+cluster_unstable(Server) ->
+ couch_replicator_notifier:notify({cluster, unstable}),
+ couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
+ couch_log:notice("~s : cluster unstable", [?MODULE]),
+ gen_server:cast(Server, cluster_unstable),
+ Server.
+
+cluster_stable(Server) ->
+ couch_replicator_notifier:notify({cluster, stable}),
+ couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1),
+ couch_log:notice("~s : cluster stable", [?MODULE]),
+ gen_server:cast(Server, cluster_stable),
+ Server.
+
+
% gen_server callbacks
init([]) ->
- net_kernel:monitor_nodes(true),
ok = config:listen_for_changes(?MODULE, nil),
Period = abs(config:get_integer("replicator", "cluster_quiet_period",
?DEFAULT_QUIET_PERIOD)),
StartPeriod = abs(config:get_integer("replicator", "cluster_start_period",
?DEFAULT_START_PERIOD)),
- couch_log:debug("Initialized clustering gen_server ~w", [self()]),
couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
- {ok, #state{
- start_time = os:timestamp(),
- last_change = os:timestamp(),
- period = Period,
- start_period = StartPeriod,
- timer = new_timer(StartPeriod)
- }}.
+ {ok, Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(), StartPeriod,
+ Period),
+ {ok, #state{mem3_cluster_pid = Mem3Cluster, cluster_stable = false}}.
terminate(_Reason, _State) ->
ok.
-handle_call(is_stable, _From, State) ->
- {reply, is_stable(State), State}.
+handle_call(is_stable, _From, #state{cluster_stable = IsStable} = State) ->
+ {reply, IsStable, State}.
-handle_cast({set_period, QuietPeriod}, State) ->
- {noreply, State#state{period = QuietPeriod}}.
+handle_cast({set_period, Period}, #state{mem3_cluster_pid = Pid} = State) ->
+ ok = mem3_cluster:set_period(Pid, Period),
+ {noreply, State};
+handle_cast(cluster_stable, State) ->
+ {noreply, State#state{cluster_stable = true}};
-handle_info({nodeup, Node}, State) ->
- Timer = new_timer(interval(State)),
- couch_replicator_notifier:notify({cluster, unstable}),
- couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
- couch_log:notice("~s : nodeup ~s, cluster unstable", [?MODULE, Node]),
- {noreply, State#state{last_change = os:timestamp(), timer = Timer}};
+handle_cast(cluster_unstable, State) ->
+ {noreply, State#state{cluster_stable = false}}.
-handle_info({nodedown, Node}, State) ->
- Timer = new_timer(interval(State)),
- couch_replicator_notifier:notify({cluster, unstable}),
- couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
- couch_log:notice("~s : nodedown ~s, cluster unstable", [?MODULE, Node]),
- {noreply, State#state{last_change = os:timestamp(), timer = Timer}};
-
-handle_info(stability_check, State) ->
- erlang:cancel_timer(State#state.timer),
- case is_stable(State) of
- true ->
- couch_replicator_notifier:notify({cluster, stable}),
- couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1),
- couch_log:notice("~s : publish cluster `stable` event", [?MODULE]),
- {noreply, State};
- false ->
- Timer = new_timer(interval(State)),
- {noreply, State#state{timer = Timer}}
- end;
handle_info(restart_config_listener, State) ->
ok = config:listen_for_changes(?MODULE, nil),
@@ -185,41 +180,6 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
--spec new_timer(non_neg_integer()) -> reference().
-new_timer(IntervalSec) ->
- erlang:send_after(IntervalSec * 1000, self(), stability_check).
-
-
-% For the first Period seconds after node boot we check cluster stability every
-% StartPeriod seconds. Once the initial Period seconds have passed we continue
-% to monitor once every Period seconds
--spec interval(#state{}) -> non_neg_integer().
-interval(#state{period = Period, start_period = StartPeriod,
- start_time = T0}) ->
- case now_diff_sec(T0) > Period of
- true ->
- % Normal operation
- Period;
- false ->
- % During startup
- StartPeriod
- end.
-
-
--spec is_stable(#state{}) -> boolean().
-is_stable(#state{last_change = TS} = State) ->
- now_diff_sec(TS) > interval(State).
-
-
--spec now_diff_sec(erlang:timestamp()) -> non_neg_integer().
-now_diff_sec(Time) ->
- case timer:now_diff(os:timestamp(), Time) of
- USec when USec < 0 ->
- 0;
- USec when USec >= 0 ->
- USec / 1000000
- end.
-
handle_config_change("replicator", "cluster_quiet_period", V, _, S) ->
ok = gen_server:cast(?MODULE, {set_period, list_to_integer(V)}),
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl
index a49d692d9..9d844b9e7 100644
--- a/src/couch_replicator/src/couch_replicator_docs.erl
+++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -121,7 +121,7 @@ update_error(#rep{db_name = DbName, doc_id = DocId, id = RepId}, Error) ->
ok.
--spec ensure_rep_db_exists() -> {ok, #db{}}.
+-spec ensure_rep_db_exists() -> {ok, Db::any()}.
ensure_rep_db_exists() ->
Db = case couch_db:open_int(?REP_DB_NAME, [?CTX, sys_db,
nologifmissing]) of
@@ -466,7 +466,7 @@ make_options(Props) ->
DefBatchSize = config:get("replicator", "worker_batch_size", "500"),
DefConns = config:get("replicator", "http_connections", "20"),
DefTimeout = config:get("replicator", "connection_timeout", "30000"),
- DefRetries = config:get("replicator", "retries_per_request", "10"),
+ DefRetries = config:get("replicator", "retries_per_request", "5"),
UseCheckpoints = config:get("replicator", "use_checkpoints", "true"),
DefCheckpointInterval = config:get("replicator", "checkpoint_interval",
"30000"),
@@ -621,11 +621,14 @@ ssl_verify_options(false) ->
[{verify, verify_none}].
--spec before_doc_update(#doc{}, #db{}) -> #doc{}.
+-spec before_doc_update(#doc{}, Db::any()) -> #doc{}.
before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
Doc;
-before_doc_update(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
- #user_ctx{roles = Roles, name = Name} = UserCtx,
+before_doc_update(#doc{body = {Body}} = Doc, Db) ->
+ #user_ctx{
+ roles = Roles,
+ name = Name
+ } = couch_db:get_user_ctx(Db),
case lists:member(<<"_replicator">>, Roles) of
true ->
Doc;
@@ -649,11 +652,11 @@ before_doc_update(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
end.
--spec after_doc_read(#doc{}, #db{}) -> #doc{}.
+-spec after_doc_read(#doc{}, Db::any()) -> #doc{}.
after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
Doc;
-after_doc_read(#doc{body = {Body}} = Doc, #db{user_ctx=UserCtx} = Db) ->
- #user_ctx{name = Name} = UserCtx,
+after_doc_read(#doc{body = {Body}} = Doc, Db) ->
+ #user_ctx{name = Name} = couch_db:get_user_ctx(Db),
case (catch couch_db:check_is_admin(Db)) of
ok ->
Doc;
diff --git a/src/couch_replicator/src/couch_replicator_ids.erl b/src/couch_replicator/src/couch_replicator_ids.erl
index 62cfdf267..e7067622b 100644
--- a/src/couch_replicator/src/couch_replicator_ids.erl
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -78,7 +78,11 @@ replication_id(#rep{user_ctx = UserCtx} = Rep, 1) ->
-spec convert([_] | binary() | {string(), string()}) -> {string(), string()}.
convert(Id) when is_list(Id) ->
convert(?l2b(Id));
-convert(Id) when is_binary(Id) ->
+convert(Id0) when is_binary(Id0) ->
+ % Spaces can result from mochiweb incorrectly unquoting + characters from
+ % the URL path. So undo the incorrect parsing here to avoid forcing
+ % users to url encode + characters.
+ Id = binary:replace(Id0, <<" ">>, <<"+">>, [global]),
lists:splitwith(fun(Char) -> Char =/= $+ end, ?b2l(Id));
convert({BaseId, Ext} = Id) when is_list(BaseId), is_list(Ext) ->
Id.
@@ -222,6 +226,16 @@ get_non_default_port(_Schema, Port) ->
-include_lib("eunit/include/eunit.hrl").
+
+replication_id_convert_test_() ->
+ [?_assertEqual(Expected, convert(Id)) || {Expected, Id} <- [
+ {{"abc", ""}, "abc"},
+ {{"abc", ""}, <<"abc">>},
+ {{"abc", "+x+y"}, <<"abc+x+y">>},
+ {{"abc", "+x+y"}, {"abc", "+x+y"}},
+ {{"abc", "+x+y"}, <<"abc x y">>}
+ ]].
+
http_v4_endpoint_test_() ->
[?_assertMatch({remote, User, Host, Port, Path, HeadersNoAuth, undefined},
get_v4_endpoint(nil, #httpdb{url = Url, headers = Headers})) ||
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 6a5722521..e7ce576f4 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -95,8 +95,6 @@ start_link(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
case gen_server:start_link(ServerName, ?MODULE, Rep, []) of
{ok, Pid} ->
- couch_log:notice("starting new replication `~s` at ~p (`~s` -> `~s`)",
- [RepChildId, Pid, Source, Target]),
{ok, Pid};
{error, Reason} ->
couch_log:warning("failed to start replication `~s` (`~s` -> `~s`)",
@@ -184,24 +182,7 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
% cancel_replication/1) and then start the replication again, but this is
% unfortunately not immune to race conditions.
- couch_log:notice("Replication `~p` is using:~n"
- "~c~p worker processes~n"
- "~ca worker batch size of ~p~n"
- "~c~p HTTP connections~n"
- "~ca connection timeout of ~p milliseconds~n"
- "~c~p retries per request~n"
- "~csocket options are: ~s~s",
- [BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t,
- MaxConns, $\t, get_value(connection_timeout, Options),
- $\t, get_value(retries, Options),
- $\t, io_lib:format("~p", [get_value(socket_options, Options)]),
- case StartSeq of
- ?LOWEST_SEQ ->
- "";
- _ ->
- io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq])
- end]),
-
+ log_replication_start(State),
couch_log:debug("Worker pids are: ~p", [Workers]),
doc_update_triggered(Rep),
@@ -254,16 +235,21 @@ handle_call({report_seq_done, Seq, StatsInc}, From,
update_task(NewState),
{noreply, NewState}.
-
-handle_cast({db_compacted, DbName},
- #rep_state{source = #db{name = DbName} = Source} = State) ->
- {ok, NewSource} = couch_db:reopen(Source),
- {noreply, State#rep_state{source = NewSource}};
-
-handle_cast({db_compacted, DbName},
- #rep_state{target = #db{name = DbName} = Target} = State) ->
- {ok, NewTarget} = couch_db:reopen(Target),
- {noreply, State#rep_state{target = NewTarget}};
+handle_cast({db_compacted, DbName}, State) ->
+ #rep_state{
+ source = Source,
+ target = Target
+ } = State,
+ SourceName = couch_replicator_utils:local_db_name(Source),
+ TargetName = couch_replicator_utils:local_db_name(Target),
+ case DbName of
+ SourceName ->
+ {ok, NewSource} = couch_db:reopen(Source),
+ {noreply, State#rep_state{source = NewSource}};
+ TargetName ->
+ {ok, NewTarget} = couch_db:reopen(Target),
+ {noreply, State#rep_state{target = NewTarget}}
+ end;
handle_cast(checkpoint, State) ->
case do_checkpoint(State) of
@@ -358,10 +344,11 @@ handle_info(timeout, InitArgs) ->
{stop, {shutdown, max_backoff}, {error, InitArgs}};
Class:Error ->
ShutdownReason = {error, replication_start_error(Error)},
+ StackTop2 = lists:sublist(erlang:get_stacktrace(), 2),
% Shutdown state is a hack as it is not really the state of the
% gen_server (it failed to initialize, so it doesn't have one).
% Shutdown state is used to pass extra info about why start failed.
- ShutdownState = {error, Class, erlang:get_stacktrace(), InitArgs},
+ ShutdownState = {error, Class, StackTop2, InitArgs},
{stop, {shutdown, ShutdownReason}, ShutdownState}
end.
@@ -394,11 +381,20 @@ terminate({shutdown, max_backoff}, {error, InitArgs}) ->
couch_replicator_notifier:notify({error, RepId, max_backoff});
terminate({shutdown, {error, Error}}, {error, Class, Stack, InitArgs}) ->
- #rep{id=RepId} = InitArgs,
+ #rep{
+ id = {BaseId, Ext} = RepId,
+ source = Source0,
+ target = Target0,
+ doc_id = DocId,
+ db_name = DbName
+ } = InitArgs,
+ Source = couch_replicator_api_wrap:db_uri(Source0),
+ Target = couch_replicator_api_wrap:db_uri(Target0),
+ RepIdStr = BaseId ++ Ext,
+ Msg = "~p:~p: Replication ~s failed to start ~p -> ~p doc ~p:~p stack:~p",
+ couch_log:error(Msg, [Class, Error, RepIdStr, Source, Target, DbName,
+ DocId, Stack]),
couch_stats:increment_counter([couch_replicator, failed_starts]),
- CleanInitArgs = rep_strip_creds(InitArgs),
- couch_log:error("~p:~p: Replication failed to start for args ~p: ~p",
- [Class, Error, CleanInitArgs, Stack]),
couch_replicator_notifier:notify({error, RepId, Error});
terminate({shutdown, max_backoff}, State) ->
@@ -436,7 +432,37 @@ code_change(_OldVsn, #rep_state{}=State, _Extra) ->
format_status(_Opt, [_PDict, State]) ->
- [{data, [{"State", state_strip_creds(State)}]}].
+ #rep_state{
+ source = Source,
+ target = Target,
+ rep_details = RepDetails,
+ start_seq = StartSeq,
+ source_seq = SourceSeq,
+ committed_seq = CommitedSeq,
+ current_through_seq = ThroughSeq,
+ highest_seq_done = HighestSeqDone,
+ session_id = SessionId
+ } = state_strip_creds(State),
+ #rep{
+ id = RepId,
+ options = Options,
+ doc_id = DocId,
+ db_name = DbName
+ } = RepDetails,
+ [
+ {rep_id, RepId},
+ {source, couch_replicator_api_wrap:db_uri(Source)},
+ {target, couch_replicator_api_wrap:db_uri(Target)},
+ {db_name, DbName},
+ {doc_id, DocId},
+ {options, Options},
+ {session_id, SessionId},
+ {start_seq, StartSeq},
+ {source_seq, SourceSeq},
+ {committed_seq, CommitedSeq},
+ {current_through_seq, ThroughSeq},
+ {highest_seq_done, HighestSeqDone}
+ ].
startup_jitter() ->
@@ -910,10 +936,10 @@ has_session_id(SessionId, [{Props} | Rest]) ->
end.
-db_monitor(#db{} = Db) ->
- couch_db:monitor(Db);
-db_monitor(_HttpDb) ->
- nil.
+db_monitor(#httpdb{}) ->
+ nil;
+db_monitor(Db) ->
+ couch_db:monitor(Db).
get_pending_count(St) ->
@@ -984,5 +1010,99 @@ replication_start_error({unauthorized, DbUri}) ->
{unauthorized, <<"unauthorized to access or create database ", DbUri/binary>>};
replication_start_error({db_not_found, DbUri}) ->
{db_not_found, <<"could not open ", DbUri/binary>>};
+replication_start_error({http_request_failed, _Method, Url0,
+ {error, {error, {conn_failed, {error, nxdomain}}}}}) ->
+ Url = ?l2b(couch_util:url_strip_password(Url0)),
+ {nxdomain, <<"could not resolve ", Url/binary>>};
+replication_start_error({http_request_failed, Method0, Url0,
+ {error, {code, Code}}}) when is_integer(Code) ->
+ Url = ?l2b(couch_util:url_strip_password(Url0)),
+ Method = ?l2b(Method0),
+ {http_error_code, Code, <<Method/binary, " ", Url/binary>>};
replication_start_error(Error) ->
Error.
+
+
+log_replication_start(#rep_state{rep_details = Rep} = RepState) ->
+ #rep{
+ id = {BaseId, Ext},
+ doc_id = DocId,
+ db_name = DbName,
+ options = Options
+ } = Rep,
+ Id = BaseId ++ Ext,
+ Workers = get_value(worker_processes, Options),
+ BatchSize = get_value(worker_batch_size, Options),
+ #rep_state{
+ source_name = Source, % credentials already stripped
+ target_name = Target, % credentials already stripped
+ session_id = Sid
+ } = RepState,
+ From = case DbName of
+ ShardName when is_binary(ShardName) ->
+ io_lib:format("from doc ~s:~s", [mem3:dbname(ShardName), DocId]);
+ _ ->
+ "from _replicate endpoint"
+ end,
+ Msg = "Starting replication ~s (~s -> ~s) ~s worker_procesess:~p"
+ " worker_batch_size:~p session_id:~s",
+ couch_log:notice(Msg, [Id, Source, Target, From, Workers, BatchSize, Sid]).
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+replication_start_error_test() ->
+ ?assertEqual({unauthorized, <<"unauthorized to access or create database"
+ " http://x/y">>}, replication_start_error({unauthorized,
+ <<"http://x/y">>})),
+ ?assertEqual({db_not_found, <<"could not open http://x/y">>},
+ replication_start_error({db_not_found, <<"http://x/y">>})),
+ ?assertEqual({nxdomain,<<"could not resolve http://x/y">>},
+ replication_start_error({http_request_failed, "GET", "http://x/y",
+ {error, {error, {conn_failed, {error, nxdomain}}}}})),
+ ?assertEqual({http_error_code,503,<<"GET http://x/y">>},
+ replication_start_error({http_request_failed, "GET", "http://x/y",
+ {error, {code, 503}}})).
+
+
+scheduler_job_format_status_test() ->
+ Source = <<"http://u:p@h1/d1">>,
+ Target = <<"http://u:p@h2/d2">>,
+ Rep = #rep{
+ id = {"base", "+ext"},
+ source = couch_replicator_docs:parse_rep_db(Source, [], []),
+ target = couch_replicator_docs:parse_rep_db(Target, [], []),
+ options = [{create_target, true}],
+ doc_id = <<"mydoc">>,
+ db_name = <<"mydb">>
+ },
+ State = #rep_state{
+ rep_details = Rep,
+ source = Rep#rep.source,
+ target = Rep#rep.target,
+ session_id = <<"a">>,
+ start_seq = <<"1">>,
+ source_seq = <<"2">>,
+ committed_seq = <<"3">>,
+ current_through_seq = <<"4">>,
+ highest_seq_done = <<"5">>
+ },
+ Format = format_status(opts_ignored, [pdict, State]),
+ ?assertEqual("http://u:*****@h1/d1/", proplists:get_value(source, Format)),
+ ?assertEqual("http://u:*****@h2/d2/", proplists:get_value(target, Format)),
+ ?assertEqual({"base", "+ext"}, proplists:get_value(rep_id, Format)),
+ ?assertEqual([{create_target, true}], proplists:get_value(options, Format)),
+ ?assertEqual(<<"mydoc">>, proplists:get_value(doc_id, Format)),
+ ?assertEqual(<<"mydb">>, proplists:get_value(db_name, Format)),
+ ?assertEqual(<<"a">>, proplists:get_value(session_id, Format)),
+ ?assertEqual(<<"1">>, proplists:get_value(start_seq, Format)),
+ ?assertEqual(<<"2">>, proplists:get_value(source_seq, Format)),
+ ?assertEqual(<<"3">>, proplists:get_value(committed_seq, Format)),
+ ?assertEqual(<<"4">>, proplists:get_value(current_through_seq, Format)),
+ ?assertEqual(<<"5">>, proplists:get_value(highest_seq_done, Format)).
+
+
+-endif.
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
index 05836d483..01881e423 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -16,6 +16,7 @@
parse_rep_doc/2,
open_db/1,
close_db/1,
+ local_db_name/1,
start_db_compaction_notifier/2,
stop_db_compaction_notifier/1,
replication_id/2,
@@ -35,6 +36,7 @@
-include_lib("couch/include/couch_db.hrl").
-include("couch_replicator.hrl").
+-include("couch_replicator_api_wrap.hrl").
-import(couch_util, [
get_value/2,
@@ -42,26 +44,35 @@
]).
-open_db(#db{name = Name, user_ctx = UserCtx}) ->
- {ok, Db} = couch_db:open(Name, [{user_ctx, UserCtx} | []]),
- Db;
-open_db(HttpDb) ->
- HttpDb.
+open_db(#httpdb{} = HttpDb) ->
+ HttpDb;
+open_db(Db) ->
+ DbName = couch_db:name(Db),
+ UserCtx = couch_db:get_user_ctx(Db),
+ {ok, NewDb} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
+ NewDb.
-close_db(#db{} = Db) ->
- couch_db:close(Db);
-close_db(_HttpDb) ->
- ok.
+close_db(#httpdb{}) ->
+ ok;
+close_db(Db) ->
+ couch_db:close(Db).
+
+
+local_db_name(#httpdb{}) ->
+ undefined;
+local_db_name(Db) ->
+ couch_db:name(Db).
-start_db_compaction_notifier(#db{name = DbName}, Server) ->
+start_db_compaction_notifier(#httpdb{}, _) ->
+ nil;
+start_db_compaction_notifier(Db, Server) ->
+ DbName = couch_db:name(Db),
{ok, Pid} = couch_event:link_listener(
?MODULE, handle_db_event, Server, [{dbname, DbName}]
),
- Pid;
-start_db_compaction_notifier(_, _) ->
- nil.
+ Pid.
stop_db_compaction_notifier(nil) ->
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index b52640d5d..344b8f286 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -67,16 +67,16 @@
-start_link(Cp, #db{} = Source, Target, ChangesManager, _MaxConns) ->
+start_link(Cp, #httpdb{} = Source, Target, ChangesManager, MaxConns) ->
+ gen_server:start_link(
+ ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []);
+
+start_link(Cp, Source, Target, ChangesManager, _MaxConns) ->
Pid = spawn_link(fun() ->
erlang:put(last_stats_report, now()),
queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager)
end),
- {ok, Pid};
-
-start_link(Cp, Source, Target, ChangesManager, MaxConns) ->
- gen_server:start_link(
- ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []).
+ {ok, Pid}.
init({Cp, Source, Target, ChangesManager, MaxConns}) ->
@@ -139,15 +139,23 @@ handle_call(flush, {Pid, _} = From,
{noreply, State2#state{flush_waiter = From}}.
-handle_cast({db_compacted, DbName},
- #state{source = #db{name = DbName} = Source} = State) ->
- {ok, NewSource} = couch_db:reopen(Source),
- {noreply, State#state{source = NewSource}};
-
-handle_cast({db_compacted, DbName},
- #state{target = #db{name = DbName} = Target} = State) ->
- {ok, NewTarget} = couch_db:reopen(Target),
- {noreply, State#state{target = NewTarget}};
+handle_cast({db_compacted, DbName} = Msg, #state{} = State) ->
+ #state{
+ source = Source,
+ target = Target
+ } = State,
+ SourceName = couch_replicator_utils:local_db_name(Source),
+ TargetName = couch_replicator_utils:local_db_name(Target),
+ case DbName of
+ SourceName ->
+ {ok, NewSource} = couch_db:reopen(Source),
+ {noreply, State#state{source = NewSource}};
+ TargetName ->
+ {ok, NewTarget} = couch_db:reopen(Target),
+ {noreply, State#state{target = NewTarget}};
+ _Else ->
+ {stop, {unexpected_async_call, Msg}, State}
+ end;
handle_cast(Msg, State) ->
{stop, {unexpected_async_call, Msg}, State}.
@@ -227,15 +235,15 @@ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) ->
Target2 = open_db(Target),
{IdRevs, Stats0} = find_missing(Changes, Target2),
case Source of
- #db{} ->
- Source2 = open_db(Source),
- Stats = local_process_batch(
- IdRevs, Cp, Source2, Target2, #batch{}, Stats0),
- close_db(Source2);
#httpdb{} ->
ok = gen_server:call(Parent, {add_stats, Stats0}, infinity),
remote_process_batch(IdRevs, Parent),
- {ok, Stats} = gen_server:call(Parent, flush, infinity)
+ {ok, Stats} = gen_server:call(Parent, flush, infinity);
+ _Db ->
+ Source2 = open_db(Source),
+ Stats = local_process_batch(
+ IdRevs, Cp, Source2, Target2, #batch{}, Stats0),
+ close_db(Source2)
end,
close_db(Target2),
ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity),
@@ -252,7 +260,7 @@ local_process_batch([], Cp, Source, Target, #batch{docs = Docs, size = Size}, St
case Target of
#httpdb{} ->
couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]);
- #db{} ->
+ _Db ->
couch_log:debug("Worker flushing doc batch of ~p docs", [Size])
end,
Stats2 = flush_docs(Target, Docs),
@@ -367,7 +375,7 @@ spawn_writer(Target, #batch{docs = DocList, size = Size}) ->
case {Target, Size > 0} of
{#httpdb{}, true} ->
couch_log:debug("Worker flushing doc batch of size ~p bytes", [Size]);
- {#db{}, true} ->
+ {_Db, true} ->
couch_log:debug("Worker flushing doc batch of ~p docs", [Size]);
_ ->
ok
@@ -429,7 +437,7 @@ maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
end
end;
-maybe_flush_docs(#db{} = Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) ->
+maybe_flush_docs(Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) ->
case SizeAcc + 1 of
SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN ->
couch_log:debug("Worker flushing doc batch of ~p docs", [SizeAcc2]),
diff --git a/src/couch_replicator/test/couch_replicator_compact_tests.erl b/src/couch_replicator/test/couch_replicator_compact_tests.erl
index a98feee66..f06a684b5 100644
--- a/src/couch_replicator/test/couch_replicator_compact_tests.erl
+++ b/src/couch_replicator/test/couch_replicator_compact_tests.erl
@@ -87,8 +87,8 @@ should_all_processes_be_alive(RepPid, Source, Target) ->
{ok, SourceDb} = reopen_db(Source),
{ok, TargetDb} = reopen_db(Target),
?assert(is_process_alive(RepPid)),
- ?assert(is_process_alive(SourceDb#db.main_pid)),
- ?assert(is_process_alive(TargetDb#db.main_pid))
+ ?assert(is_process_alive(couch_db:get_pid(SourceDb))),
+ ?assert(is_process_alive(couch_db:get_pid(TargetDb)))
end).
should_run_replication(RepPid, RepId, Source, Target) ->
@@ -164,12 +164,12 @@ should_populate_and_compact(RepPid, Source, Target, BatchSize, Rounds) ->
compact_db("source", SourceDb),
?assert(is_process_alive(RepPid)),
- ?assert(is_process_alive(SourceDb#db.main_pid)),
+ ?assert(is_process_alive(couch_db:get_pid(SourceDb))),
wait_for_compaction("source", SourceDb),
compact_db("target", TargetDb),
?assert(is_process_alive(RepPid)),
- ?assert(is_process_alive(TargetDb#db.main_pid)),
+ ?assert(is_process_alive(couch_db:get_pid(TargetDb))),
wait_for_compaction("target", TargetDb),
{ok, SourceDb2} = reopen_db(SourceDb),
@@ -180,14 +180,14 @@ should_populate_and_compact(RepPid, Source, Target, BatchSize, Rounds) ->
compact_db("source", SourceDb2),
?assert(is_process_alive(RepPid)),
- ?assert(is_process_alive(SourceDb2#db.main_pid)),
+ ?assert(is_process_alive(couch_db:get_pid(SourceDb2))),
pause_writer(Writer),
wait_for_compaction("source", SourceDb2),
resume_writer(Writer),
compact_db("target", TargetDb2),
?assert(is_process_alive(RepPid)),
- ?assert(is_process_alive(TargetDb2#db.main_pid)),
+ ?assert(is_process_alive(couch_db:get_pid(TargetDb2))),
pause_writer(Writer),
wait_for_compaction("target", TargetDb2),
resume_writer(Writer)
@@ -263,14 +263,16 @@ should_compare_databases(Source, Target) ->
reopen_db({remote, Db}) ->
reopen_db(Db);
-reopen_db(#db{name=DbName}) ->
- reopen_db(DbName);
-reopen_db(DbName) ->
+reopen_db(DbName) when is_binary(DbName) ->
{ok, Db} = couch_db:open_int(DbName, []),
ok = couch_db:close(Db),
- {ok, Db}.
+ {ok, Db};
+reopen_db(Db) ->
+ reopen_db(couch_db:name(Db)).
-compact_db(Type, #db{name = Name}) ->
+
+compact_db(Type, Db0) ->
+ Name = couch_db:name(Db0),
{ok, Db} = couch_db:open_int(Name, []),
{ok, CompactPid} = couch_db:start_compact(Db),
MonRef = erlang:monitor(process, CompactPid),
@@ -405,7 +407,8 @@ stop_writer(Pid) ->
{reason, "Timeout stopping source database writer"}]})
end.
-writer_loop(#db{name = DbName}, Parent, Counter) ->
+writer_loop(Db0, Parent, Counter) ->
+ DbName = couch_db:name(Db0),
{ok, Data} = file:read_file(?ATTFILE),
maybe_pause(Parent, Counter),
Doc = couch_doc:from_json_obj({[
diff --git a/src/couch_stats/src/couch_stats.app.src b/src/couch_stats/src/couch_stats.app.src
index d60ce1c0a..6339a0f1d 100644
--- a/src/couch_stats/src/couch_stats.app.src
+++ b/src/couch_stats/src/couch_stats.app.src
@@ -16,7 +16,5 @@
{registered, [couch_stats_aggregator, couch_stats_process_tracker]},
{applications, [kernel, stdlib, folsom, couch_log]},
{mod, {couch_stats_app, []}},
- {env, [
- {collection_interval, 10}
- ]}
+ {env, []}
]}.
diff --git a/src/couch_stats/src/couch_stats.erl b/src/couch_stats/src/couch_stats.erl
index e02da29f1..59175f7a8 100644
--- a/src/couch_stats/src/couch_stats.erl
+++ b/src/couch_stats/src/couch_stats.erl
@@ -29,6 +29,10 @@
update_gauge/2
]).
+
+-include("couch_stats.hrl").
+
+
-type response() :: ok | {error, unknown_metric}.
-type stat() :: {any(), [{atom(), any()}]}.
@@ -56,7 +60,7 @@ new(counter, Name) ->
{error, Name, metric_already_exists} -> {error, metric_exists}
end;
new(histogram, Name) ->
- {ok, Time} = application:get_env(couch_stats, collection_interval),
+ Time = config:get_integer("stats", "interval", ?DEFAULT_INTERVAL),
case folsom_metrics:new_histogram(Name, slide_uniform, {Time, 1024}) of
ok -> ok;
{error, Name, metric_already_exists} -> {error, metric_exists}
diff --git a/src/couch_stats/src/couch_stats.hrl b/src/couch_stats/src/couch_stats.hrl
new file mode 100644
index 000000000..3cffe99f1
--- /dev/null
+++ b/src/couch_stats/src/couch_stats.hrl
@@ -0,0 +1,14 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-define(DEFAULT_INTERVAL, 10).
+-define(RELOAD_INTERVAL, 600).
diff --git a/src/couch_stats/src/couch_stats_aggregator.erl b/src/couch_stats/src/couch_stats_aggregator.erl
index 0f6c9dd83..17bd6fc33 100644
--- a/src/couch_stats/src/couch_stats_aggregator.erl
+++ b/src/couch_stats/src/couch_stats_aggregator.erl
@@ -30,6 +30,9 @@
terminate/2
]).
+
+-include("couch_stats.hrl").
+
-record(st, {
descriptions,
stats,
@@ -52,11 +55,9 @@ start_link() ->
init([]) ->
{ok, Descs} = reload_metrics(),
- Interval = case application:get_env(couch_stats, collection_interval) of
- {ok, I} -> I * 1000
- end,
- {ok, CT} = timer:send_interval(Interval, self(), collect),
- {ok, RT} = timer:send_interval(600000, self(), reload),
+ Interval = config:get_integer("stats", "interval", ?DEFAULT_INTERVAL),
+ {ok, CT} = timer:send_interval(Interval * 1000, self(), collect),
+ {ok, RT} = timer:send_interval(?RELOAD_INTERVAL * 1000, self(), reload),
{ok, #st{descriptions=Descs, stats=[], collect_timer=CT, reload_timer=RT}}.
handle_call(fetch, _from, #st{stats = Stats}=State) ->
diff --git a/src/fabric/include/couch_db_tmp.hrl b/src/fabric/include/couch_db_tmp.hrl
deleted file mode 100644
index cd3a047d4..000000000
--- a/src/fabric/include/couch_db_tmp.hrl
+++ /dev/null
@@ -1,296 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--define(LOCAL_DOC_PREFIX, "_local/").
--define(DESIGN_DOC_PREFIX0, "_design").
--define(DESIGN_DOC_PREFIX, "_design/").
-
--define(MIN_STR, <<"">>).
--define(MAX_STR, <<255>>). % illegal utf string
-
--define(JSON_ENCODE(V), couch_util:json_encode(V)).
--define(JSON_DECODE(V), couch_util:json_decode(V)).
-
--define(b2l(V), binary_to_list(V)).
--define(l2b(V), list_to_binary(V)).
-
--define(DEFAULT_ATTACHMENT_CONTENT_TYPE, <<"application/octet-stream">>).
-
--define(LOG_DEBUG(Format, Args), couch_log:debug(Format, Args)).
--define(LOG_INFO(Format, Args), couch_log:notice(Format, Args)).
--define(LOG_ERROR(Format, Args), couch_log:error(Format, Args)).
-
--record(rev_info,
- {
- rev,
- seq = 0,
- deleted = false,
- body_sp = nil % stream pointer
- }).
-
--record(doc_info,
- {
- id = <<"">>,
- high_seq = 0,
- revs = [] % rev_info
- }).
-
--record(full_doc_info,
- {id = <<"">>,
- update_seq = 0,
- deleted = false,
- data_size = 0,
- rev_tree = []
- }).
-
--record(httpd,
- {mochi_req,
- peer,
- method,
- path_parts,
- db_url_handlers,
- user_ctx,
- req_body = undefined,
- design_url_handlers,
- auth,
- default_fun,
- url_handlers
- }).
-
-
--record(doc,
- {
- id = <<"">>,
- revs = {0, []},
-
- % the json body object.
- body = {[]},
-
- atts = [], % attachments
-
- deleted = false,
-
- % key/value tuple of meta information, provided when using special options:
- % couch_db:open_doc(Db, Id, Options).
- meta = []
- }).
-
-
--record(att,
- {
- name,
- type,
- att_len,
- disk_len, % length of the attachment in its identity form
- % (that is, without a content encoding applied to it)
- % differs from att_len when encoding /= identity
- md5= <<>>,
- revpos=0,
- data,
- encoding=identity % currently supported values are:
- % identity, gzip
- % additional values to support in the future:
- % deflate, compress
- }).
-
-
--record(user_ctx,
- {
- name=null,
- roles=[],
- handler
- }).
-
-% This should be updated anytime a header change happens that requires more
-% than filling in new defaults.
-%
-% As long the changes are limited to new header fields (with inline
-% defaults) added to the end of the record, then there is no need to increment
-% the disk revision number.
-%
-% if the disk revision is incremented, then new upgrade logic will need to be
-% added to couch_db_updater:init_db.
-
--define(LATEST_DISK_VERSION, 5).
-
--record(db_header,
- {disk_version = ?LATEST_DISK_VERSION,
- update_seq = 0,
- unused = 0,
- id_tree_state = nil,
- seq_tree_state = nil,
- local_tree_state = nil,
- purge_seq = 0,
- purged_docs = nil,
- security_ptr = nil,
- revs_limit = 1000
- }).
-
--record(db,
- {main_pid = nil,
- update_pid = nil,
- compactor_pid = nil,
- instance_start_time, % number of microsecs since jan 1 1970 as a binary string
- fd,
- fd_monitor,
- header = #db_header{},
- committed_update_seq,
- id_tree,
- seq_tree,
- local_tree,
- update_seq,
- name,
- filepath,
- validate_doc_funs = undefined,
- security = [],
- security_ptr = nil,
- user_ctx = #user_ctx{},
- waiting_delayed_commit = nil,
- revs_limit = 1000,
- fsync_options = [],
- is_sys_db = false
- }).
-
-
--record(view_query_args, {
- start_key,
- end_key,
- start_docid = ?MIN_STR,
- end_docid = ?MAX_STR,
-
- direction = fwd,
- inclusive_end=true, % aka a closed-interval
-
- limit = 10000000000, % Huge number to simplify logic
- skip = 0,
-
- group_level = 0,
-
- view_type = nil,
- include_docs = false,
- stale = false,
- multi_get = false,
- callback = nil,
- list = nil,
- keys = nil,
- sorted = true,
- extra = []
-}).
-
--record(view_fold_helper_funs, {
- reduce_count,
- passed_end,
- start_response,
- send_row
-}).
-
--record(reduce_fold_helper_funs, {
- start_response,
- send_row
-}).
-
--record(extern_resp_args, {
- code = 200,
- stop = false,
- data = <<>>,
- ctype = "application/json",
- headers = [],
- json = nil
-}).
-
--record(group, {
- sig=nil,
- dbname,
- fd=nil,
- name,
- def_lang,
- design_options=[],
- views,
- id_btree=nil,
- current_seq=0,
- purge_seq=0,
- query_server=nil,
- waiting_delayed_commit=nil,
- atts=[]
- }).
-
--record(view,
- {id_num,
- map_names=[],
- def,
- btree=nil,
- reduce_funs=[],
- dbcopies=[],
- options=[]
- }).
-
--record(index_header,
- {seq=0,
- purge_seq=0,
- id_btree_state=nil,
- view_states=nil
- }).
-
--record(http_db, {
- url,
- auth = [],
- resource = "",
- headers = [
- {"User-Agent", "CouchDB/"++couch:version()},
- {"Accept", "application/json"},
- {"Accept-Encoding", "gzip"}
- ],
- qs = [],
- method = get,
- body = nil,
- options = [
- {response_format,binary},
- {inactivity_timeout, 30000}
- ],
- retries = 10,
- pause = 500,
- conn = nil
-}).
-
-% small value used in revision trees to indicate the revision isn't stored
--define(REV_MISSING, []).
-
--record(changes_args, {
- feed = "normal",
- dir = fwd,
- since = "0",
- limit = 1000000000000000,
- style = main_only,
- heartbeat,
- timeout,
- filter,
- include_docs = false
-}).
-
--record(proc, {
- pid,
- lang,
- client = nil,
- ddoc_keys = [],
- prompt_fun,
- set_timeout_fun,
- stop_fun,
- data_fun
-}).
-
--record(leaf, {
- deleted,
- ptr,
- seq,
- size = 0,
- atts = []
-}).
diff --git a/src/fabric/rebar.config b/src/fabric/rebar.config
index ccfb9b435..df35ac639 100644
--- a/src/fabric/rebar.config
+++ b/src/fabric/rebar.config
@@ -11,5 +11,5 @@
% the License.
{deps, [
- {meck, ".*", {git, "https://github.com/apache/couchdb-meck.git", {tag, "0.8.2"}}}
+ {meck, ".*", {git, "https://github.com/apache/couchdb-meck.git", {tag, "0.8.8"}}}
]}.
diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index 1dcdb0e00..4a0727131 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -38,7 +38,7 @@
-include_lib("fabric/include/fabric.hrl").
--type dbname() :: (iodata() | #db{}).
+-type dbname() :: (iodata() | tuple()).
-type docid() :: iodata().
-type revision() :: {integer(), binary()}.
-type callback() :: fun((any(), any()) -> {ok | stop, any()}).
@@ -483,10 +483,12 @@ dbname(DbName) when is_list(DbName) ->
list_to_binary(DbName);
dbname(DbName) when is_binary(DbName) ->
DbName;
-dbname(#db{name=Name}) ->
- Name;
-dbname(DbName) ->
- erlang:error({illegal_database_name, DbName}).
+dbname(Db) ->
+ try
+ couch_db:name(Db)
+ catch error:badarg ->
+ erlang:error({illegal_database_name, Db})
+ end.
name(Thing) ->
couch_util:to_binary(Thing).
diff --git a/src/fabric/src/fabric_db_info.erl b/src/fabric/src/fabric_db_info.erl
index ab93e4736..98e8e52e4 100644
--- a/src/fabric/src/fabric_db_info.erl
+++ b/src/fabric/src/fabric_db_info.erl
@@ -22,7 +22,8 @@ go(DbName) ->
Workers = fabric_util:submit_jobs(Shards, get_db_info, []),
RexiMon = fabric_util:create_monitors(Shards),
Fun = fun handle_message/3,
- Acc0 = {fabric_dict:init(Workers, nil), []},
+ {ok, ClusterInfo} = get_cluster_info(Shards),
+ Acc0 = {fabric_dict:init(Workers, nil), [{cluster, ClusterInfo}]},
try
case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of
{ok, Acc} -> {ok, Acc};
@@ -104,6 +105,8 @@ merge_results(Info) ->
[{other, {merge_other_results(X)}} | Acc];
(disk_format_version, X, Acc) ->
[{disk_format_version, lists:max(X)} | Acc];
+ (cluster, [X], Acc) ->
+ [{cluster, {X}} | Acc];
(_, _, Acc) ->
Acc
end, [{instance_start_time, <<"0">>}], Dict).
@@ -127,3 +130,46 @@ merge_object(Objects) ->
(Key, X, Acc) ->
[{Key, lists:sum(X)} | Acc]
end, [], Dict).
+
+get_cluster_info(Shards) ->
+ Dict = lists:foldl(fun(#shard{range = R}, Acc) ->
+ dict:update_counter(R, 1, Acc)
+ end, dict:new(), Shards),
+ Q = dict:size(Dict),
+ N = dict:fold(fun(_, X, Acc) -> max(X, Acc) end, 0, Dict),
+ %% defaults as per mem3:quorum/1
+ WR = N div 2 + 1,
+ {ok, [{q, Q}, {n, N}, {w, WR}, {r, WR}]}.
+
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+get_cluster_info_test_() ->
+ {
+ setup,
+ fun setup/0,
+ fun get_cluster_info_test_generator/1
+ }.
+
+
+setup() ->
+ Quorums = [1, 2, 3],
+ Shards = [1, 3, 5, 8, 12, 24],
+ [{N, Q} || N <- Quorums, Q <- Shards].
+
+get_cluster_info_test_generator([]) ->
+ [];
+get_cluster_info_test_generator([{N, Q} | Rest]) ->
+ {generator,
+ fun() ->
+ Nodes = lists:seq(1, 8),
+ Shards = mem3_util:create_partition_map(<<"foo">>, N, Q, Nodes),
+ {ok, Info} = get_cluster_info(Shards),
+ [
+ ?_assertEqual(N, couch_util:get_value(n, Info)),
+ ?_assertEqual(Q, couch_util:get_value(q, Info))
+ ] ++ get_cluster_info_test_generator(Rest)
+ end}.
+
+-endif.
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 93d7d1536..9cf653c59 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -16,8 +16,9 @@
-export([open_doc/3, open_revs/4, get_doc_info/3, get_full_doc_info/3,
get_missing_revs/2, get_missing_revs/3, update_docs/3]).
-export([all_docs/3, changes/3, map_view/4, reduce_view/4, group_info/2]).
--export([create_db/1, delete_db/1, reset_validation_funs/1, set_security/3,
- set_revs_limit/3, create_shard_db_doc/2, delete_shard_db_doc/2]).
+-export([create_db/1, create_db/2, delete_db/1, reset_validation_funs/1,
+ set_security/3, set_revs_limit/3, create_shard_db_doc/2,
+ delete_shard_db_doc/2]).
-export([get_all_security/2, open_shard/2]).
-export([compact/1, compact/2]).
@@ -38,7 +39,8 @@
}).
%% rpc endpoints
-%% call to with_db will supply your M:F with a #db{} and then remaining args
+%% call to with_db will supply your M:F with a Db instance
+%% and then remaining args
%% @equiv changes(DbName, Args, StartSeq, [])
changes(DbName, Args, StartSeq) ->
@@ -76,13 +78,13 @@ changes(DbName, Options, StartVector, DbOptions) ->
args = Args,
options = Options,
pending = couch_db:count_changes_since(Db, StartSeq),
- epochs = get_epochs(Db)
+ epochs = couch_db:get_epochs(Db)
},
try
{ok, #cacc{seq=LastSeq, pending=Pending, epochs=Epochs}} =
couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0),
rexi:stream_last({complete, [
- {seq, {LastSeq, uuid(Db), owner_of(LastSeq, Epochs)}},
+ {seq, {LastSeq, uuid(Db), couch_db:owner_of(Epochs, LastSeq)}},
{pending, Pending}
]})
after
@@ -144,7 +146,10 @@ fix_skip_and_limit(Args) ->
Args#mrargs{skip=0, limit=Skip+Limit}.
create_db(DbName) ->
- rexi:reply(case couch_server:create(DbName, []) of
+ create_db(DbName, []).
+
+create_db(DbName, Options) ->
+ rexi:reply(case couch_server:create(DbName, Options) of
{ok, _} ->
ok;
Error ->
@@ -225,7 +230,7 @@ get_missing_revs(DbName, IdRevsList, Options) ->
not_found ->
{Id, Revs, []}
end
- end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))};
+ end, IdRevsList, couch_db:get_full_doc_infos(Db, Ids))};
Error ->
Error
end).
@@ -249,8 +254,9 @@ group_info(DbName, DDocId, DbOptions) ->
reset_validation_funs(DbName) ->
case get_or_create_db(DbName, []) of
- {ok, #db{main_pid = Pid}} ->
- gen_server:cast(Pid, {load_validation_funs, undefined});
+ {ok, Db} ->
+ DbPid = couch_db:get_pid(Db),
+ gen_server:cast(DbPid, {load_validation_funs, undefined});
_ ->
ok
end.
@@ -362,7 +368,7 @@ changes_enumerator(DocInfo, Acc) ->
Opts = if Conflicts -> [conflicts | DocOptions]; true -> DocOptions end,
ChangesRow = {change, [
{pending, Pending-1},
- {seq, {Seq, uuid(Db), owner_of(Seq, Epochs)}},
+ {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}},
{id, Id},
{changes, Results},
{deleted, Del} |
@@ -460,79 +466,20 @@ set_io_priority(DbName, Options) ->
ok
end.
-calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) ->
- Seq;
-calculate_start_seq(Db, Node, {Seq, Uuid}) ->
- % Treat the current node as the epoch node
- calculate_start_seq(Db, Node, {Seq, Uuid, Node});
-calculate_start_seq(Db, _Node, {Seq, Uuid, EpochNode}) ->
- case is_prefix(Uuid, couch_db:get_uuid(Db)) of
- true ->
- case is_owner(EpochNode, Seq, couch_db:get_epochs(Db)) of
- true -> Seq;
- false -> 0
- end;
- false ->
- %% The file was rebuilt, most likely in a different
- %% order, so rewind.
- 0
- end;
-calculate_start_seq(Db, _Node, {replace, OriginalNode, Uuid, Seq}) ->
- case is_prefix(Uuid, couch_db:get_uuid(Db)) of
- true ->
- start_seq(get_epochs(Db), OriginalNode, Seq);
- false ->
+
+calculate_start_seq(Db, Node, Seq) ->
+ case couch_db:calculate_start_seq(Db, Node, Seq) of
+ N when is_integer(N) ->
+ N;
+ {replace, OriginalNode, Uuid, OriginalSeq} ->
%% Scan history looking for an entry with
%% * target_node == TargetNode
%% * target_uuid == TargetUUID
%% * target_seq =< TargetSeq
%% If such an entry is found, stream from associated source_seq
- mem3_rep:find_source_seq(Db, OriginalNode, Uuid, Seq)
+ mem3_rep:find_source_seq(Db, OriginalNode, Uuid, OriginalSeq)
end.
-is_prefix(Pattern, Subject) ->
- binary:longest_common_prefix([Pattern, Subject]) == size(Pattern).
-
-is_owner(Node, Seq, Epochs) ->
- validate_epochs(Epochs),
- Node =:= owner_of(Seq, Epochs).
-
-owner_of(_Seq, []) ->
- undefined;
-owner_of(Seq, [{EpochNode, EpochSeq} | _Rest]) when Seq > EpochSeq ->
- EpochNode;
-owner_of(Seq, [_ | Rest]) ->
- owner_of(Seq, Rest).
-
-get_epochs(Db) ->
- Epochs = couch_db:get_epochs(Db),
- validate_epochs(Epochs),
- Epochs.
-
-start_seq([{OrigNode, EpochSeq} | _], OrigNode, Seq) when Seq > EpochSeq ->
- %% OrigNode is the owner of the Seq so we can safely stream from there
- Seq;
-start_seq([{_, NewSeq}, {OrigNode, _} | _], OrigNode, Seq) when Seq > NewSeq ->
- %% We transferred this file before Seq was written on OrigNode, so we need
- %% to stream from the beginning of the next epoch. Note that it is _not_
- %% necessary for the current node to own the epoch beginning at NewSeq
- NewSeq;
-start_seq([_ | Rest], OrigNode, Seq) ->
- start_seq(Rest, OrigNode, Seq);
-start_seq([], OrigNode, Seq) ->
- erlang:error({epoch_mismatch, OrigNode, Seq}).
-
-validate_epochs(Epochs) ->
- %% Assert uniqueness.
- case length(Epochs) == length(lists:ukeysort(2, Epochs)) of
- true -> ok;
- false -> erlang:error(duplicate_epoch)
- end,
- %% Assert order.
- case Epochs == lists:sort(fun({_, A}, {_, B}) -> B =< A end, Epochs) of
- true -> ok;
- false -> erlang:error(epoch_order)
- end.
uuid(Db) ->
Uuid = couch_db:get_uuid(Db),
@@ -544,30 +491,6 @@ uuid_prefix_len() ->
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-calculate_start_seq_test() ->
- %% uuid mismatch is always a rewind.
- Hdr1 = couch_db_header:new(),
- Hdr2 = couch_db_header:set(Hdr1, [{epochs, [{node1, 1}]}, {uuid, <<"uuid1">>}]),
- ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, node1, {1, <<"uuid2">>})),
- %% uuid matches and seq is owned by node.
- Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]),
- ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, node1, {2, <<"uuid1">>})),
- %% uuids match but seq is not owned by node.
- Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]),
- ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, node1, {3, <<"uuid1">>})),
- %% return integer if we didn't get a vector.
- ?assertEqual(4, calculate_start_seq(#db{}, foo, 4)).
-
-is_owner_test() ->
- ?assertNot(is_owner(foo, 1, [])),
- ?assertNot(is_owner(foo, 1, [{foo, 1}])),
- ?assert(is_owner(foo, 2, [{foo, 1}])),
- ?assert(is_owner(foo, 50, [{bar, 100}, {foo, 1}])),
- ?assert(is_owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])),
- ?assert(is_owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])),
- ?assertError(duplicate_epoch, is_owner(foo, 1, [{foo, 1}, {bar, 1}])),
- ?assertError(epoch_order, is_owner(foo, 1, [{foo, 100}, {bar, 200}])).
-
maybe_filtered_json_doc_no_filter_test() ->
Body = {[{<<"a">>, 1}]},
Doc = #doc{id = <<"1">>, revs = {1, [<<"r1">>]}, body = Body},
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index 765561381..bf3f023db 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -305,7 +305,8 @@ path_ends_with(Path, Suffix) ->
fake_db(DbName, Opts) ->
{SecProps} = fabric:get_security(DbName), % as admin
UserCtx = couch_util:get_value(user_ctx, Opts, #user_ctx{}),
- #db{name = DbName, security = SecProps, user_ctx = UserCtx}.
+ {ok, Db} = couch_db:clustered_db(DbName, UserCtx, SecProps),
+ Db.
%% test function
kv(Item, Count) ->
diff --git a/src/mango/src/mango_crud.erl b/src/mango/src/mango_crud.erl
index 68c9d6cc4..41a4d143d 100644
--- a/src/mango/src/mango_crud.erl
+++ b/src/mango/src/mango_crud.erl
@@ -111,7 +111,7 @@ maybe_add_user_ctx(Db, Opts) ->
{user_ctx, _} ->
Opts;
false ->
- [{user_ctx, Db#db.user_ctx} | Opts]
+ [{user_ctx, couch_db:get_user_ctx(Db)} | Opts]
end.
diff --git a/src/mango/src/mango_cursor.erl b/src/mango/src/mango_cursor.erl
index f36febdfc..e0792b737 100644
--- a/src/mango/src/mango_cursor.erl
+++ b/src/mango/src/mango_cursor.erl
@@ -90,9 +90,9 @@ execute(#cursor{index=Idx}=Cursor, UserFun, UserAcc) ->
maybe_filter_indexes_by_ddoc(Indexes, Opts) ->
case lists:keyfind(use_index, 1, Opts) of
{use_index, []} ->
- %We remove any indexes that have a selector
+ % We remove any indexes that have a selector
% since they are only used when specified via use_index
- remove_indexes_with_selector(Indexes);
+ remove_indexes_with_partial_filter_selector(Indexes);
{use_index, [DesignId]} ->
filter_indexes(Indexes, DesignId);
{use_index, [DesignId, ViewName]} ->
@@ -117,9 +117,9 @@ filter_indexes(Indexes0, DesignId, ViewName) ->
lists:filter(FiltFun, Indexes).
-remove_indexes_with_selector(Indexes) ->
+remove_indexes_with_partial_filter_selector(Indexes) ->
FiltFun = fun(Idx) ->
- case mango_idx:get_idx_selector(Idx) of
+ case mango_idx:get_partial_filter_selector(Idx) of
undefined -> true;
_ -> false
end
diff --git a/src/mango/src/mango_cursor_text.erl b/src/mango/src/mango_cursor_text.erl
index 70c911ac1..88abfc00a 100644
--- a/src/mango/src/mango_cursor_text.erl
+++ b/src/mango/src/mango_cursor_text.erl
@@ -51,7 +51,7 @@ create(Db, Indexes, Selector, Opts0) ->
?MANGO_ERROR(multiple_text_indexes)
end,
- Opts = unpack_bookmark(Db#db.name, Opts0),
+ Opts = unpack_bookmark(couch_db:name(Db), Opts0),
DreyfusLimit = get_dreyfus_limit(),
Limit = erlang:min(DreyfusLimit, couch_util:get_value(limit, Opts, mango_opts:default_limit())),
@@ -98,7 +98,7 @@ execute(Cursor, UserFun, UserAcc) ->
},
CAcc = #cacc{
selector = Selector,
- dbname = Db#db.name,
+ dbname = couch_db:name(Db),
ddocid = ddocid(Idx),
idx_name = mango_idx:name(Idx),
bookmark = get_bookmark(Opts),
diff --git a/src/mango/src/mango_httpd.erl b/src/mango/src/mango_httpd.erl
index d3ebf48c9..3ed51e2f0 100644
--- a/src/mango/src/mango_httpd.erl
+++ b/src/mango/src/mango_httpd.erl
@@ -192,7 +192,8 @@ handle_find_req(Req, _Db) ->
set_user_ctx(#httpd{user_ctx=Ctx}, Db) ->
- Db#db{user_ctx=Ctx}.
+ {ok, NewDb} = couch_db:set_user_ctx(Db, Ctx),
+ NewDb.
get_idx_w_opts(Opts) ->
diff --git a/src/mango/src/mango_idx.erl b/src/mango/src/mango_idx.erl
index b8122517d..c5f870d5b 100644
--- a/src/mango/src/mango_idx.erl
+++ b/src/mango/src/mango_idx.erl
@@ -44,7 +44,7 @@
to_json/1,
delete/4,
get_usable_indexes/3,
- get_idx_selector/1
+ get_partial_filter_selector/1
]).
@@ -291,12 +291,12 @@ idx_mod(#idx{type = <<"text">>}) ->
end.
-db_to_name(#db{name=Name}) ->
- Name;
db_to_name(Name) when is_binary(Name) ->
Name;
db_to_name(Name) when is_list(Name) ->
- iolist_to_binary(Name).
+ iolist_to_binary(Name);
+db_to_name(Db) ->
+ couch_db:name(Db).
get_idx_def(Opts) ->
@@ -368,13 +368,66 @@ filter_opts([Opt | Rest]) ->
[Opt | filter_opts(Rest)].
-get_idx_selector(#idx{def = Def}) when Def =:= all_docs; Def =:= undefined ->
+get_partial_filter_selector(#idx{def = Def}) when Def =:= all_docs; Def =:= undefined ->
undefined;
-get_idx_selector(#idx{def = {Def}}) ->
+get_partial_filter_selector(#idx{def = {Def}}) ->
+ case proplists:get_value(<<"partial_filter_selector">>, Def) of
+ undefined -> get_legacy_selector(Def);
+ {[]} -> undefined;
+ Selector -> Selector
+ end.
+
+
+% Partial filter selectors is supported in text indexes via the selector field
+% This adds backwards support for existing indexes that might have a selector in it
+get_legacy_selector(Def) ->
case proplists:get_value(<<"selector">>, Def) of
undefined -> undefined;
{[]} -> undefined;
Selector -> Selector
end.
-
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+index(SelectorName, Selector) ->
+ {
+ idx,<<"mango_test_46418cd02081470d93290dc12306ebcb">>,
+ <<"_design/57e860dee471f40a2c74ea5b72997b81dda36a24">>,
+ <<"Selected">>,<<"json">>,
+ {[{<<"fields">>,{[{<<"location">>,<<"asc">>}]}},
+ {SelectorName,{Selector}}]},
+ [{<<"def">>,{[{<<"fields">>,[<<"location">>]}]}}]
+ }.
+
+get_partial_filter_all_docs_test() ->
+ Idx = #idx{def = all_docs},
+ ?assertEqual(undefined, get_partial_filter_selector(Idx)).
+
+get_partial_filter_undefined_def_test() ->
+ Idx = #idx{def = undefined},
+ ?assertEqual(undefined, get_partial_filter_selector(Idx)).
+
+get_partial_filter_selector_default_test() ->
+ Idx = index(<<"partial_filter_selector">>, []),
+ ?assertEqual(undefined, get_partial_filter_selector(Idx)).
+
+get_partial_filter_selector_missing_test() ->
+ Idx = index(<<"partial_filter_selector">>, []),
+ ?assertEqual(undefined, get_partial_filter_selector(Idx)).
+
+get_partial_filter_selector_with_selector_test() ->
+ Selector = [{<<"location">>,{[{<<"$gt">>,<<"FRA">>}]}}],
+ Idx = index(<<"partial_filter_selector">>, Selector),
+ ?assertEqual({Selector}, get_partial_filter_selector(Idx)).
+
+get_partial_filter_selector_with_legacy_selector_test() ->
+ Selector = [{<<"location">>,{[{<<"$gt">>,<<"FRA">>}]}}],
+ Idx = index(<<"selector">>, Selector),
+ ?assertEqual({Selector}, get_partial_filter_selector(Idx)).
+
+get_partial_filter_selector_with_legacy_default_selector_test() ->
+ Idx = index(<<"selector">>, []),
+ ?assertEqual(undefined, get_partial_filter_selector(Idx)).
+
+-endif.
diff --git a/src/mango/src/mango_idx_text.erl b/src/mango/src/mango_idx_text.erl
index f90ac7fac..e00c241d2 100644
--- a/src/mango/src/mango_idx_text.erl
+++ b/src/mango/src/mango_idx_text.erl
@@ -223,7 +223,13 @@ opts() ->
{optional, true},
{default, {[]}}
]},
- {<<"selector">>, [
+ {<<"partial_filter_selector">>, [
+ {tag, partial_filter_selector},
+ {optional, true},
+ {default, {[]}},
+ {validator, fun mango_opts:validate_selector/1}
+ ]},
+ {<<"selector">>, [
{tag, selector},
{optional, true},
{default, {[]}},
@@ -344,8 +350,9 @@ indexable_fields(Fields, {op_default, _}) ->
[<<"$default">> | Fields].
-maybe_reject_index_all_req({Def}, #db{name=DbName, user_ctx=Ctx}) ->
- User = Ctx#user_ctx.name,
+maybe_reject_index_all_req({Def}, Db) ->
+ DbName = couch_db:name(Db),
+ #user_ctx{name = User} = couch_db:get_user_ctx(Db),
Fields = couch_util:get_value(fields, Def),
case {Fields, forbid_index_all()} of
{all_fields, "true"} ->
@@ -374,7 +381,9 @@ setup() ->
end),
%default index all def that generates {fields, all_fields}
Index = #idx{def={[]}},
- Db = #db{name = <<"testdb">>, user_ctx=#user_ctx{name = <<"u1">>}},
+ DbName = <<"testdb">>,
+ UserCtx = #user_ctx{name = <<"u1">>},
+ {ok, Db} = couch_db:clustered_db(DbName, UserCtx),
{Index, Db, Ctx}.
diff --git a/src/mango/src/mango_idx_view.erl b/src/mango/src/mango_idx_view.erl
index d5dcd0c07..4cb039c4a 100644
--- a/src/mango/src/mango_idx_view.erl
+++ b/src/mango/src/mango_idx_view.erl
@@ -114,11 +114,12 @@ columns(Idx) ->
is_usable(Idx, Selector) ->
- % This index is usable if at least the first column is
- % a member of the indexable fields of the selector.
- Columns = columns(Idx),
- Fields = indexable_fields(Selector),
- lists:member(hd(Columns), Fields) andalso not is_text_search(Selector).
+ % This index is usable if all of the columns are
+ % restricted by the selector such that they are required to exist
+ % and the selector is not a text search (so requires a text index)
+ RequiredFields = columns(Idx),
+ mango_selector:has_required_fields(Selector, RequiredFields)
+ andalso not is_text_search(Selector).
is_text_search({[]}) ->
@@ -198,8 +199,8 @@ opts() ->
{tag, fields},
{validator, fun mango_opts:validate_sort/1}
]},
- {<<"selector">>, [
- {tag, selector},
+ {<<"partial_filter_selector">>, [
+ {tag, partial_filter_selector},
{optional, true},
{default, {[]}},
{validator, fun mango_opts:validate_selector/1}
diff --git a/src/mango/src/mango_native_proc.erl b/src/mango/src/mango_native_proc.erl
index 82081a976..61d79b7ec 100644
--- a/src/mango/src/mango_native_proc.erl
+++ b/src/mango/src/mango_native_proc.erl
@@ -135,7 +135,7 @@ index_doc(#st{indexes=Indexes}, Doc) ->
get_index_entries({IdxProps}, Doc) ->
{Fields} = couch_util:get_value(<<"fields">>, IdxProps),
- Selector = get_index_selector(IdxProps),
+ Selector = get_index_partial_filter_selector(IdxProps),
case should_index(Selector, Doc) of
false ->
[];
@@ -159,7 +159,7 @@ get_index_values(Fields, Doc) ->
get_text_entries({IdxProps}, Doc) ->
- Selector = get_index_selector(IdxProps),
+ Selector = get_index_partial_filter_selector(IdxProps),
case should_index(Selector, Doc) of
true ->
get_text_entries0(IdxProps, Doc);
@@ -168,10 +168,21 @@ get_text_entries({IdxProps}, Doc) ->
end.
-get_index_selector(IdxProps) ->
- case couch_util:get_value(<<"selector">>, IdxProps) of
- [] -> {[]};
- Else -> Else
+get_index_partial_filter_selector(IdxProps) ->
+ case couch_util:get_value(<<"partial_filter_selector">>, IdxProps) of
+ undefined ->
+ % this is to support legacy text indexes that had the partial_filter_selector
+ % set as selector
+ case couch_util:get_value(<<"selector">>, IdxProps, []) of
+ [] ->
+ {[]};
+ Else ->
+ Else
+ end;
+ [] ->
+ {[]};
+ Else ->
+ Else
end.
diff --git a/src/mango/src/mango_selector.erl b/src/mango/src/mango_selector.erl
index bcf347201..fe3998683 100644
--- a/src/mango/src/mango_selector.erl
+++ b/src/mango/src/mango_selector.erl
@@ -15,7 +15,8 @@
-export([
normalize/1,
- match/2
+ match/2,
+ has_required_fields/2
]).
@@ -566,3 +567,110 @@ match({[{Field, Cond}]}, Value, Cmp) ->
match({[_, _ | _] = _Props} = Sel, _Value, _Cmp) ->
erlang:error({unnormalized_selector, Sel}).
+
+
+% Returns true if Selector requires all
+% fields in RequiredFields to exist in any matching documents.
+
+% For each condition in the selector, check
+% whether the field is in RequiredFields.
+% If it is, remove it from RequiredFields and continue
+% until we match then all or run out of selector to
+% match against.
+
+% Empty selector
+has_required_fields({[]}, _) ->
+ false;
+
+% No more required fields
+has_required_fields(_, []) ->
+ true;
+
+% No more selector
+has_required_fields([], _) ->
+ false;
+
+has_required_fields(Selector, RequiredFields) when not is_list(Selector) ->
+ has_required_fields([Selector], RequiredFields);
+
+% We can "see" through $and operator. We ignore other
+% combination operators because they can't be used to restrict
+% an index.
+has_required_fields([{[{<<"$and">>, Args}]}], RequiredFields)
+ when is_list(Args) ->
+ has_required_fields(Args, RequiredFields);
+
+has_required_fields([{[{Field, Cond}]} | Rest], RequiredFields) ->
+ case Cond of
+ % $exists:false is a special case - this is the only operator
+ % that explicitly does not require a field to exist
+ {[{<<"$exists">>, false}]} ->
+ has_required_fields(Rest, RequiredFields);
+ _ ->
+ has_required_fields(Rest, lists:delete(Field, RequiredFields))
+ end.
+
+
+%%%%%%%% module tests below %%%%%%%%
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+has_required_fields_basic_test() ->
+ RequiredFields = [<<"A">>],
+ Selector = {[{<<"A">>, <<"foo">>}]},
+ Normalized = normalize(Selector),
+ ?assertEqual(true, has_required_fields(Normalized, RequiredFields)).
+
+has_required_fields_basic_failure_test() ->
+ RequiredFields = [<<"B">>],
+ Selector = {[{<<"A">>, <<"foo">>}]},
+ Normalized = normalize(Selector),
+ ?assertEqual(false, has_required_fields(Normalized, RequiredFields)).
+
+has_required_fields_empty_selector_test() ->
+ RequiredFields = [<<"A">>],
+ Selector = {[]},
+ Normalized = normalize(Selector),
+ ?assertEqual(false, has_required_fields(Normalized, RequiredFields)).
+
+has_required_fields_exists_false_test() ->
+ RequiredFields = [<<"A">>],
+ Selector = {[{<<"A">>,{[{<<"$exists">>, false}]}}]},
+ Normalized = normalize(Selector),
+ ?assertEqual(false, has_required_fields(Normalized, RequiredFields)).
+
+has_required_fields_and_true_test() ->
+ RequiredFields = [<<"A">>],
+ Selector = {[{<<"$and">>,
+ [
+ {[{<<"A">>, <<"foo">>}]},
+ {[{<<"B">>, <<"foo">>}]}
+ ]
+ }]},
+ Normalized = normalize(Selector),
+ ?assertEqual(true, has_required_fields(Normalized, RequiredFields)).
+
+has_required_fields_and_false_test() ->
+ RequiredFields = [<<"A">>, <<"C">>],
+ Selector = {[{<<"$and">>,
+ [
+ {[{<<"A">>, <<"foo">>}]},
+ {[{<<"B">>, <<"foo">>}]}
+ ]
+ }]},
+ Normalized = normalize(Selector),
+ ?assertEqual(false, has_required_fields(Normalized, RequiredFields)).
+
+has_required_fields_or_test() ->
+ RequiredFields = [<<"A">>],
+ Selector = {[{<<"$or">>,
+ [
+ {[{<<"A">>, <<"foo">>}]},
+ {[{<<"B">>, <<"foo">>}]}
+ ]
+ }]},
+ Normalized = normalize(Selector),
+ ?assertEqual(false, has_required_fields(Normalized, RequiredFields)).
+
+-endif. \ No newline at end of file
diff --git a/src/mango/test/01-index-crud-test.py b/src/mango/test/01-index-crud-test.py
index 6582020f5..617bfd523 100644
--- a/src/mango/test/01-index-crud-test.py
+++ b/src/mango/test/01-index-crud-test.py
@@ -35,7 +35,7 @@ class IndexCrudTests(mango.DbPerClass):
for fields in bad_fields:
try:
self.db.create_index(fields)
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 400
else:
raise AssertionError("bad create index")
@@ -54,7 +54,7 @@ class IndexCrudTests(mango.DbPerClass):
for bt in bad_types:
try:
self.db.create_index(["foo"], idx_type=bt)
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 400, (bt, e.response.status_code)
else:
raise AssertionError("bad create index")
@@ -70,13 +70,13 @@ class IndexCrudTests(mango.DbPerClass):
for bn in bad_names:
try:
self.db.create_index(["foo"], name=bn)
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 400
else:
raise AssertionError("bad create index")
try:
self.db.create_index(["foo"], ddoc=bn)
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 400
else:
raise AssertionError("bad create index")
@@ -207,7 +207,7 @@ class IndexCrudTests(mango.DbPerClass):
# Missing design doc
try:
self.db.delete_index("this_is_not_a_design_doc_id", "foo")
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 404
else:
raise AssertionError("bad index delete")
@@ -220,7 +220,7 @@ class IndexCrudTests(mango.DbPerClass):
ddocid = idx["ddoc"].split("/")[-1]
try:
self.db.delete_index(ddocid, "this_is_not_an_index_name")
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 404
else:
raise AssertionError("bad index delete")
@@ -228,7 +228,7 @@ class IndexCrudTests(mango.DbPerClass):
# Bad view type
try:
self.db.delete_index(ddocid, idx["name"], idx_type="not_a_real_type")
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 404
else:
raise AssertionError("bad index delete")
@@ -244,7 +244,6 @@ class IndexCrudTests(mango.DbPerClass):
for idx in self.db.list_indexes():
if idx["name"] != "text_idx_01":
continue
- print idx["def"]
assert idx["def"]["fields"] == [
{"stringidx": "string"},
{"booleanidx": "boolean"}
@@ -270,7 +269,7 @@ class IndexCrudTests(mango.DbPerClass):
for fields in bad_fields:
try:
self.db.create_text_index(fields=fields)
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 400
else:
raise AssertionError("bad create text index")
@@ -310,10 +309,10 @@ class IndexCrudTests(mango.DbPerClass):
try:
self.db.list_indexes(skip=-1)
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 500
try:
self.db.list_indexes(limit=0)
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 500
diff --git a/src/mango/test/02-basic-find-test.py b/src/mango/test/02-basic-find-test.py
index 699166e28..a8725ffa8 100644
--- a/src/mango/test/02-basic-find-test.py
+++ b/src/mango/test/02-basic-find-test.py
@@ -30,7 +30,7 @@ class BasicFindTests(mango.UserDocsTests):
for bs in bad_selectors:
try:
self.db.find(bs)
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 400
else:
raise AssertionError("bad find")
@@ -49,7 +49,7 @@ class BasicFindTests(mango.UserDocsTests):
for bl in bad_limits:
try:
self.db.find({"int":{"$gt":2}}, limit=bl)
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 400
else:
raise AssertionError("bad find")
@@ -68,7 +68,7 @@ class BasicFindTests(mango.UserDocsTests):
for bs in bad_skips:
try:
self.db.find({"int":{"$gt":2}}, skip=bs)
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 400
else:
raise AssertionError("bad find")
@@ -88,7 +88,7 @@ class BasicFindTests(mango.UserDocsTests):
for bs in bad_sorts:
try:
self.db.find({"int":{"$gt":2}}, sort=bs)
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 400
else:
raise AssertionError("bad find")
@@ -108,7 +108,7 @@ class BasicFindTests(mango.UserDocsTests):
for bf in bad_fields:
try:
self.db.find({"int":{"$gt":2}}, fields=bf)
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 400
else:
raise AssertionError("bad find")
@@ -126,7 +126,7 @@ class BasicFindTests(mango.UserDocsTests):
for br in bad_rs:
try:
self.db.find({"int":{"$gt":2}}, r=br)
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 400
else:
raise AssertionError("bad find")
@@ -142,7 +142,7 @@ class BasicFindTests(mango.UserDocsTests):
for bc in bad_conflicts:
try:
self.db.find({"int":{"$gt":2}}, conflicts=bc)
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 400
else:
raise AssertionError("bad find")
diff --git a/src/mango/test/03-operator-test.py b/src/mango/test/03-operator-test.py
index 863752682..1af39f205 100644
--- a/src/mango/test/03-operator-test.py
+++ b/src/mango/test/03-operator-test.py
@@ -26,8 +26,8 @@ class OperatorTests:
"manager": True,
"favorites": {"$all": ["Lisp", "Python"]}
})
- self.assertEqual(len(docs), 4)
- user_ids = [2,12,9,14]
+ self.assertEqual(len(docs), 3)
+ user_ids = [2,12,9]
self.assertUserIds(user_ids, docs)
def test_all_non_array(self):
@@ -124,7 +124,7 @@ class OperatorTests:
"manager": True,
"favorites": {"$in": ["Ruby", "Python"]}
})
- self.assertUserIds([2,6,7,9,11,12,14], docs)
+ self.assertUserIds([2,6,7,9,11,12], docs)
def test_nin_operator_array(self):
docs = self.db.find({
diff --git a/src/mango/test/05-index-selection-test.py b/src/mango/test/05-index-selection-test.py
index 2fb0a405b..1cc210382 100644
--- a/src/mango/test/05-index-selection-test.py
+++ b/src/mango/test/05-index-selection-test.py
@@ -23,7 +23,7 @@ class IndexSelectionTests(mango.UserDocsTests):
user_docs.add_text_indexes(klass.db, {})
def test_basic(self):
- resp = self.db.find({"name.last": "A last name"}, explain=True)
+ resp = self.db.find({"age": 123}, explain=True)
self.assertEqual(resp["index"]["type"], "json")
def test_with_and(self):
@@ -77,7 +77,7 @@ class IndexSelectionTests(mango.UserDocsTests):
def test_no_valid_sort_index(self):
try:
self.db.find({"_id": {"$gt": None}}, sort=["name"], return_raw=True)
- except Exception, e:
+ except Exception as e:
self.assertEqual(e.response.status_code, 400)
else:
raise AssertionError("bad find")
@@ -87,11 +87,83 @@ class IndexSelectionTests(mango.UserDocsTests):
ddocid = "_design/ad3d537c03cd7c6a43cf8dff66ef70ea54c2b40f"
try:
self.db.find({}, use_index=ddocid)
- except Exception, e:
+ except Exception as e:
self.assertEqual(e.response.status_code, 400)
else:
raise AssertionError("bad find")
+ def test_uses_all_docs_when_fields_do_not_match_selector(self):
+ # index exists on ["company", "manager"] but not ["company"]
+ # so we should fall back to all docs (so we include docs
+ # with no "manager" field)
+ selector = {
+ "company": "Pharmex"
+ }
+ docs = self.db.find(selector)
+ self.assertEqual(len(docs), 1)
+ self.assertEqual(docs[0]["company"], "Pharmex")
+ self.assertNotIn("manager", docs[0])
+
+ resp_explain = self.db.find(selector, explain=True)
+ self.assertEqual(resp_explain["index"]["type"], "special")
+
+ def test_uses_all_docs_when_selector_doesnt_require_fields_to_exist(self):
+ # as in test above, use a selector that doesn't overlap with the index
+ # due to an explicit exists clause
+ selector = {
+ "company": "Pharmex",
+ "manager": {"$exists": False}
+ }
+ docs = self.db.find(selector)
+ self.assertEqual(len(docs), 1)
+ self.assertEqual(docs[0]["company"], "Pharmex")
+ self.assertNotIn("manager", docs[0])
+
+ resp_explain = self.db.find(selector, explain=True)
+ self.assertEqual(resp_explain["index"]["type"], "special")
+
+ def test_uses_index_when_no_range_or_equals(self):
+ # index on ["manager"] should be valid because
+ # selector requires "manager" to exist. The
+ # selector doesn't narrow the keyrange so it's
+ # a full index scan
+ selector = {
+ "manager": {"$exists": True}
+ }
+ docs = self.db.find(selector)
+ self.assertEqual(len(docs), 14)
+
+ resp_explain = self.db.find(selector, explain=True)
+ self.assertEqual(resp_explain["index"]["type"], "json")
+
+
+ def test_reject_use_index_invalid_fields(self):
+ # index on ["company","manager"] which should not be valid
+ ddocid = "_design/a0c425a60cf3c3c09e3c537c9ef20059dcef9198"
+ selector = {
+ "company": "Pharmex"
+ }
+ try:
+ self.db.find(selector, use_index=ddocid)
+ except Exception as e:
+ self.assertEqual(e.response.status_code, 400)
+ else:
+ raise AssertionError("did not reject bad use_index")
+
+ def test_reject_use_index_sort_order(self):
+ # index on ["company","manager"] which should not be valid
+ ddocid = "_design/a0c425a60cf3c3c09e3c537c9ef20059dcef9198"
+ selector = {
+ "company": {"$gt": None},
+ "manager": {"$gt": None}
+ }
+ try:
+ self.db.find(selector, use_index=ddocid, sort=[{"manager":"desc"}])
+ except Exception as e:
+ self.assertEqual(e.response.status_code, 400)
+ else:
+ raise AssertionError("did not reject bad use_index")
+
# This doc will not be saved given the new ddoc validation code
# in couch_mrview
def test_manual_bad_view_idx01(self):
@@ -178,7 +250,7 @@ class MultiTextIndexSelectionTests(mango.UserDocsTests):
def test_multi_text_index_is_error(self):
try:
self.db.find({"$text": "a query"}, explain=True)
- except Exception, e:
+ except Exception as e:
self.assertEqual(e.response.status_code, 400)
def test_use_index_works(self):
diff --git a/src/mango/test/06-basic-text-test.py b/src/mango/test/06-basic-text-test.py
index 7f5ce6345..c02950c46 100644
--- a/src/mango/test/06-basic-text-test.py
+++ b/src/mango/test/06-basic-text-test.py
@@ -64,7 +64,6 @@ class BasicTextTests(mango.UserDocsTextTests):
# Nested Level
docs = self.db.find({"favorites.0.2": "Python"})
- print len(docs)
assert len(docs) == 1
for d in docs:
assert "Python" in d["favorites"][0][2]
@@ -451,14 +450,12 @@ class ElemMatchTests(mango.FriendDocsTextTests):
}
}
docs = self.db.find(q)
- print len(docs)
assert len(docs) == 1
assert docs[0]["bestfriends"] == ["Wolverine", "Cyclops"]
q = {"results": {"$elemMatch": {"$gte": 80, "$lt": 85}}}
docs = self.db.find(q)
- print len(docs)
assert len(docs) == 1
assert docs[0]["results"] == [82, 85, 88]
diff --git a/src/mango/test/07-text-custom-field-list-test.py b/src/mango/test/07-text-custom-field-list-test.py
index 50a5c0522..a43e33003 100644
--- a/src/mango/test/07-text-custom-field-list-test.py
+++ b/src/mango/test/07-text-custom-field-list-test.py
@@ -56,7 +56,7 @@ class CustomFieldsTest(mango.UserDocsTextTests):
try:
self.db.find({"selector": {"$or": [{"favorites": "Ruby"},
{"favorites.0":"Ruby"}]}})
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 400
def test_in_with_array(self):
@@ -82,7 +82,7 @@ class CustomFieldsTest(mango.UserDocsTextTests):
vals = ["Random Garbage", 52, {"Versions": {"Alpha": "Beta"}}]
try:
self.db.find({"favorites": {"$in": vals}})
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 400
def test_nin_with_array(self):
diff --git a/src/mango/test/08-text-limit-test.py b/src/mango/test/08-text-limit-test.py
index 191a1108a..4bc87b4b9 100644
--- a/src/mango/test/08-text-limit-test.py
+++ b/src/mango/test/08-text-limit-test.py
@@ -47,7 +47,6 @@ class LimitTests(mango.LimitDocsTextTests):
def test_limit_field5(self):
q = {"age": {"$exists": True}}
docs = self.db.find(q, limit=250)
- print len(docs)
assert len(docs) == 75
for d in docs:
assert d["age"] < 100
@@ -78,7 +77,7 @@ class LimitTests(mango.LimitDocsTextTests):
q = {"$or": [{"user_id" : {"$lt" : 100}}, {"filtered_array.[]": 1}]}
try:
self.db.find(q, limit=-1)
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 400
else:
raise AssertionError("Should have thrown error for negative limit")
@@ -87,7 +86,7 @@ class LimitTests(mango.LimitDocsTextTests):
q = {"$or": [{"user_id" : {"$lt" : 100}}, {"filtered_array.[]": 1}]}
try:
self.db.find(q, skip=-1)
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 400
else:
raise AssertionError("Should have thrown error for negative skip")
@@ -102,7 +101,6 @@ class LimitTests(mango.LimitDocsTextTests):
def run_bookmark_check(self, size):
- print size
q = {"age": {"$gt": 0}}
seen_docs = set()
bm = None
diff --git a/src/mango/test/09-text-sort-test.py b/src/mango/test/09-text-sort-test.py
index ae36a6a33..1c5557227 100644
--- a/src/mango/test/09-text-sort-test.py
+++ b/src/mango/test/09-text-sort-test.py
@@ -43,7 +43,7 @@ class SortTests(mango.UserDocsTextTests):
q = {"email": {"$gt": None}}
try:
self.db.find(q, sort=["email"])
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 400
else:
raise AssertionError("Should have thrown error for sort")
@@ -79,7 +79,7 @@ class SortTests(mango.UserDocsTextTests):
{"age": "34"}]}
try:
self.db.find(q, sort=["age"])
- except Exception, e:
+ except Exception as e:
assert e.response.status_code == 400
else:
raise AssertionError("Should have thrown error for sort")
diff --git a/src/mango/test/14-json-pagination.py b/src/mango/test/14-json-pagination.py
index ddac15662..ea06e0a2a 100644
--- a/src/mango/test/14-json-pagination.py
+++ b/src/mango/test/14-json-pagination.py
@@ -159,7 +159,7 @@ class PaginateJsonDocs(mango.DbPerClass):
def test_bad_bookmark(self):
try:
self.db.find({"_id": {"$gt": 0}}, bookmark="bad-bookmark")
- except Exception, e:
+ except Exception as e:
resp = e.response.json()
assert resp["error"] == "invalid_bookmark"
assert resp["reason"] == "Invalid bookmark value: \"bad-bookmark\""
@@ -171,7 +171,7 @@ class PaginateJsonDocs(mango.DbPerClass):
bookmark = 'g2wAAAABaANkABFub2RlMUBjb3VjaGRiLm5ldGwAAAACYQBiP____2poAkY_8AAAAAAAAGEHag'
try:
self.db.find({"_id": {"$gt": 0}}, bookmark=bookmark)
- except Exception, e:
+ except Exception as e:
resp = e.response.json()
assert resp["error"] == "invalid_bookmark"
assert e.response.status_code == 400
diff --git a/src/mango/test/16-index-selectors.py b/src/mango/test/16-index-selectors.py
index b18945609..3ce659ecf 100644
--- a/src/mango/test/16-index-selectors.py
+++ b/src/mango/test/16-index-selectors.py
@@ -73,20 +73,84 @@ DOCS = [
},
]
+oldschoolddoc = {
+ "_id": "_design/oldschool",
+ "language": "query",
+ "views": {
+ "oldschool": {
+ "map": {
+ "fields": {
+ "location": "asc"
+ },
+ "selector": {
+ "location": {"$gte": "FRA"}
+ }
+ },
+ "reduce": "_count",
+ "options": {
+ "def": {
+ "fields": [
+ "location"
+ ]
+ }
+ }
+ }
+ }
+}
+
+oldschoolddoctext = {
+ "_id": "_design/oldschooltext",
+ "language": "query",
+ "indexes": {
+ "oldschooltext": {
+ "index": {
+ "default_analyzer": "keyword",
+ "default_field": {},
+ "selector": {
+ "location": {"$gte": "FRA"}
+ },
+ "fields": [
+ {
+ "name": "location",
+ "type": "string"
+ }
+ ],
+ "index_array_lengths": True
+ },
+ "analyzer": {
+ "name": "perfield",
+ "default": "keyword",
+ "fields": {
+ "$default": "standard"
+ }
+ }
+ }
+ }
+}
+
class IndexSelectorJson(mango.DbPerClass):
def setUp(self):
self.db.recreate()
self.db.save_docs(copy.deepcopy(DOCS))
- def test_saves_selector_in_index(self):
+ def test_saves_partial_filter_selector_in_index(self):
selector = {"location": {"$gte": "FRA"}}
- self.db.create_index(["location"], selector=selector)
+ self.db.create_index(["location"], partial_filter_selector=selector)
indexes = self.db.list_indexes()
- self.assertEqual(indexes[1]["def"]["selector"], selector)
+ self.assertEqual(indexes[1]["def"]["partial_filter_selector"], selector)
+
+ def test_saves_selector_in_index_throws(self):
+ selector = {"location": {"$gte": "FRA"}}
+ try:
+ self.db.create_index(["location"], selector=selector)
+ except Exception as e:
+ assert e.response.status_code == 400
+ else:
+ raise AssertionError("bad index creation")
def test_uses_partial_index_for_query_selector(self):
selector = {"location": {"$gte": "FRA"}}
- self.db.create_index(["location"], selector=selector, ddoc="Selected", name="Selected")
+ self.db.create_index(["location"], partial_filter_selector=selector, ddoc="Selected", name="Selected")
resp = self.db.find(selector, explain=True, use_index='Selected')
self.assertEqual(resp["index"]["name"], "Selected")
docs = self.db.find(selector, use_index='Selected')
@@ -95,7 +159,7 @@ class IndexSelectorJson(mango.DbPerClass):
def test_uses_partial_index_with_different_selector(self):
selector = {"location": {"$gte": "FRA"}}
selector2 = {"location": {"$gte": "A"}}
- self.db.create_index(["location"], selector=selector, ddoc="Selected", name="Selected")
+ self.db.create_index(["location"], partial_filter_selector=selector, ddoc="Selected", name="Selected")
resp = self.db.find(selector2, explain=True, use_index='Selected')
self.assertEqual(resp["index"]["name"], "Selected")
docs = self.db.find(selector2, use_index='Selected')
@@ -103,28 +167,36 @@ class IndexSelectorJson(mango.DbPerClass):
def test_doesnot_use_selector_when_not_specified(self):
selector = {"location": {"$gte": "FRA"}}
- self.db.create_index(["location"], selector=selector, ddoc="Selected", name="Selected")
+ self.db.create_index(["location"], partial_filter_selector=selector, ddoc="Selected", name="Selected")
resp = self.db.find(selector, explain=True)
self.assertEqual(resp["index"]["name"], "_all_docs")
def test_doesnot_use_selector_when_not_specified_with_index(self):
selector = {"location": {"$gte": "FRA"}}
- self.db.create_index(["location"], selector=selector, ddoc="Selected", name="Selected")
+ self.db.create_index(["location"], partial_filter_selector=selector, ddoc="Selected", name="Selected")
self.db.create_index(["location"], name="NotSelected")
resp = self.db.find(selector, explain=True)
self.assertEqual(resp["index"]["name"], "NotSelected")
+ def test_old_selector_still_supported(self):
+ selector = {"location": {"$gte": "FRA"}}
+ self.db.save_doc(oldschoolddoc)
+ resp = self.db.find(selector, explain=True, use_index='oldschool')
+ self.assertEqual(resp["index"]["name"], "oldschool")
+ docs = self.db.find(selector, use_index='oldschool')
+ self.assertEqual(len(docs), 3)
+
@unittest.skipUnless(mango.has_text_service(), "requires text service")
- def test_text_saves_selector_in_index(self):
+ def test_text_saves_partialfilterselector_in_index(self):
selector = {"location": {"$gte": "FRA"}}
- self.db.create_text_index(fields=[{"name":"location", "type":"string"}], selector=selector)
+ self.db.create_text_index(fields=[{"name":"location", "type":"string"}], partial_filter_selector=selector)
indexes = self.db.list_indexes()
- self.assertEqual(indexes[1]["def"]["selector"], selector)
+ self.assertEqual(indexes[1]["def"]["partial_filter_selector"], selector)
@unittest.skipUnless(mango.has_text_service(), "requires text service")
def test_text_uses_partial_index_for_query_selector(self):
selector = {"location": {"$gte": "FRA"}}
- self.db.create_text_index(fields=[{"name":"location", "type":"string"}], selector=selector, ddoc="Selected", name="Selected")
+ self.db.create_text_index(fields=[{"name":"location", "type":"string"}], partial_filter_selector=selector, ddoc="Selected", name="Selected")
resp = self.db.find(selector, explain=True, use_index='Selected')
self.assertEqual(resp["index"]["name"], "Selected")
docs = self.db.find(selector, use_index='Selected', fields=['_id', 'location'])
@@ -134,7 +206,7 @@ class IndexSelectorJson(mango.DbPerClass):
def test_text_uses_partial_index_with_different_selector(self):
selector = {"location": {"$gte": "FRA"}}
selector2 = {"location": {"$gte": "A"}}
- self.db.create_text_index(fields=[{"name":"location", "type":"string"}], selector=selector, ddoc="Selected", name="Selected")
+ self.db.create_text_index(fields=[{"name":"location", "type":"string"}], partial_filter_selector=selector, ddoc="Selected", name="Selected")
resp = self.db.find(selector2, explain=True, use_index='Selected')
self.assertEqual(resp["index"]["name"], "Selected")
docs = self.db.find(selector2, use_index='Selected')
@@ -143,14 +215,23 @@ class IndexSelectorJson(mango.DbPerClass):
@unittest.skipUnless(mango.has_text_service(), "requires text service")
def test_text_doesnot_use_selector_when_not_specified(self):
selector = {"location": {"$gte": "FRA"}}
- self.db.create_text_index(fields=[{"name":"location", "type":"string"}], selector=selector, ddoc="Selected", name="Selected")
+ self.db.create_text_index(fields=[{"name":"location", "type":"string"}], partial_filter_selector=selector, ddoc="Selected", name="Selected")
resp = self.db.find(selector, explain=True)
self.assertEqual(resp["index"]["name"], "_all_docs")
@unittest.skipUnless(mango.has_text_service(), "requires text service")
def test_text_doesnot_use_selector_when_not_specified_with_index(self):
selector = {"location": {"$gte": "FRA"}}
- self.db.create_text_index(fields=[{"name":"location", "type":"string"}], selector=selector, ddoc="Selected", name="Selected")
+ self.db.create_text_index(fields=[{"name":"location", "type":"string"}], partial_filter_selector=selector, ddoc="Selected", name="Selected")
self.db.create_text_index(fields=[{"name":"location", "type":"string"}], name="NotSelected")
resp = self.db.find(selector, explain=True)
- self.assertEqual(resp["index"]["name"], "NotSelected") \ No newline at end of file
+ self.assertEqual(resp["index"]["name"], "NotSelected")
+
+ @unittest.skipUnless(mango.has_text_service(), "requires text service")
+ def test_text_old_selector_still_supported(self):
+ selector = {"location": {"$gte": "FRA"}}
+ self.db.save_doc(oldschoolddoctext)
+ resp = self.db.find(selector, explain=True, use_index='oldschooltext')
+ self.assertEqual(resp["index"]["name"], "oldschooltext")
+ docs = self.db.find(selector, use_index='oldschooltext')
+ self.assertEqual(len(docs), 3) \ No newline at end of file
diff --git a/src/mango/test/mango.py b/src/mango/test/mango.py
index 2c8971485..a275a23d0 100644
--- a/src/mango/test/mango.py
+++ b/src/mango/test/mango.py
@@ -44,7 +44,7 @@ class Database(object):
return "http://{}:{}/{}".format(self.host, self.port, self.dbname)
def path(self, parts):
- if isinstance(parts, (str, unicode)):
+ if isinstance(parts, ("".__class__, u"".__class__)):
parts = [parts]
return "/".join([self.url] + parts)
@@ -84,7 +84,8 @@ class Database(object):
r.raise_for_status()
return r.json()
- def create_index(self, fields, idx_type="json", name=None, ddoc=None, selector=None):
+ def create_index(self, fields, idx_type="json", name=None, ddoc=None,
+ partial_filter_selector=None, selector=None):
body = {
"index": {
"fields": fields
@@ -98,6 +99,8 @@ class Database(object):
body["ddoc"] = ddoc
if selector is not None:
body["index"]["selector"] = selector
+ if partial_filter_selector is not None:
+ body["index"]["partial_filter_selector"] = partial_filter_selector
body = json.dumps(body)
r = self.sess.post(self.path("_index"), data=body)
r.raise_for_status()
@@ -105,8 +108,9 @@ class Database(object):
assert r.json()["name"] is not None
return r.json()["result"] == "created"
- def create_text_index(self, analyzer=None, selector=None, idx_type="text",
- default_field=None, fields=None, name=None, ddoc=None,index_array_lengths=None):
+ def create_text_index(self, analyzer=None, idx_type="text",
+ partial_filter_selector=None, default_field=None, fields=None,
+ name=None, ddoc=None,index_array_lengths=None):
body = {
"index": {
},
@@ -121,8 +125,8 @@ class Database(object):
body["index"]["default_field"] = default_field
if index_array_lengths is not None:
body["index"]["index_array_lengths"] = index_array_lengths
- if selector is not None:
- body["index"]["selector"] = selector
+ if partial_filter_selector is not None:
+ body["index"]["partial_filter_selector"] = partial_filter_selector
if fields is not None:
body["index"]["fields"] = fields
if ddoc is not None:
diff --git a/src/mango/test/user_docs.py b/src/mango/test/user_docs.py
index 9896e5596..02ffe9ffc 100644
--- a/src/mango/test/user_docs.py
+++ b/src/mango/test/user_docs.py
@@ -493,7 +493,6 @@ DOCS = [
},
"company": "Pharmex",
"email": "faithhess@pharmex.com",
- "manager": True,
"favorites": [
"Erlang",
"Python",
diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl
index 41278025b..e2cbb2ec6 100644
--- a/src/mem3/src/mem3.erl
+++ b/src/mem3/src/mem3.erl
@@ -145,13 +145,13 @@ get_shard(DbName, Node, Range) ->
local_shards(DbName) ->
mem3_shards:local(DbName).
-shard_suffix(#db{name=DbName}) ->
- shard_suffix(DbName);
-shard_suffix(DbName0) ->
+shard_suffix(DbName0) when is_binary(DbName0) ->
Shard = hd(shards(DbName0)),
<<"shards/", _:8/binary, "-", _:8/binary, "/", DbName/binary>> =
Shard#shard.name,
- filename:extension(binary_to_list(DbName)).
+ filename:extension(binary_to_list(DbName));
+shard_suffix(Db) ->
+ shard_suffix(couch_db:name(Db)).
fold_shards(Fun, Acc) ->
mem3_shards:fold(Fun, Acc).
@@ -295,10 +295,11 @@ group_by_range(Shards) ->
% quorum functions
-quorum(#db{name=DbName}) ->
- quorum(DbName);
-quorum(DbName) ->
- n(DbName) div 2 + 1.
+quorum(DbName) when is_binary(DbName) ->
+ n(DbName) div 2 + 1;
+quorum(Db) ->
+ quorum(couch_db:name(Db)).
+
node(#shard{node=Node}) ->
Node;
diff --git a/src/mem3/src/mem3_cluster.erl b/src/mem3/src/mem3_cluster.erl
new file mode 100644
index 000000000..7e3d477cb
--- /dev/null
+++ b/src/mem3/src/mem3_cluster.erl
@@ -0,0 +1,161 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Maintain cluster stability information. A cluster is considered stable if there
+% were no changes to during a given period of time.
+%
+% To be notified of cluster stability / instability the owner module must
+% implement the mem3_cluster behavior. When cluster membership changes,
+% cluster_unstable behavior callback will be called. After that is are no more
+% changes to the cluster, then cluster_stable callback will be called.
+%
+% The period is passed in as start argument but it can also be set dynamically
+% via the set_period/2 API call.
+%
+% In some cases it might be useful to have a shorter pariod during startup.
+% That can be configured via the StartPeriod argument. If the time since start
+% is less than a full period, then the StartPeriod is used as the period.
+
+
+-module(mem3_cluster).
+
+-behaviour(gen_server).
+
+-export([
+ start_link/4,
+ set_period/2
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-callback cluster_stable(Context :: term()) -> NewContext :: term().
+-callback cluster_unstable(Context :: term()) -> NewContext :: term().
+
+
+-record(state, {
+ mod :: atom(),
+ ctx :: term(),
+ start_time :: erlang:timestamp(),
+ last_change :: erlang:timestamp(),
+ period :: integer(),
+ start_period :: integer(),
+ timer :: reference()
+}).
+
+
+-spec start_link(module(), term(), integer(), integer()) ->
+ {ok, pid()} | ignore | {error, term()}.
+start_link(Module, Context, StartPeriod, Period)
+ when is_atom(Module), is_integer(StartPeriod), is_integer(Period) ->
+ gen_server:start_link(?MODULE, [Module, Context, StartPeriod, Period], []).
+
+
+-spec set_period(pid(), integer()) -> ok.
+set_period(Server, Period) when is_pid(Server), is_integer(Period) ->
+ gen_server:cast(Server, {set_period, Period}).
+
+
+% gen_server callbacks
+
+init([Module, Context, StartPeriod, Period]) ->
+ net_kernel:monitor_nodes(true),
+ {ok, #state{
+ mod = Module,
+ ctx = Context,
+ start_time = os:timestamp(),
+ last_change = os:timestamp(),
+ period = Period,
+ start_period = StartPeriod,
+ timer = new_timer(StartPeriod)
+ }}.
+
+
+terminate(_Reason, _State) ->
+ ok.
+
+handle_call(_Msg, _From, State) ->
+ {reply, ignored, State}.
+
+
+handle_cast({set_period, Period}, State) ->
+ {noreply, State#state{period = Period}}.
+
+
+handle_info({nodeup, _Node}, State) ->
+ {noreply, cluster_changed(State)};
+
+handle_info({nodedown, _Node}, State) ->
+ {noreply, cluster_changed(State)};
+
+handle_info(stability_check, #state{mod = Mod, ctx = Ctx} = State) ->
+ erlang:cancel_timer(State#state.timer),
+ case now_diff_sec(State#state.last_change) > interval(State) of
+ true ->
+ {noreply, State#state{ctx = Mod:cluster_stable(Ctx)}};
+ false ->
+ Timer = new_timer(interval(State)),
+ {noreply, State#state{timer = Timer}}
+ end.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+%% Internal functions
+
+-spec cluster_changed(#state{}) -> #state{}.
+cluster_changed(#state{mod = Mod, ctx = Ctx} = State) ->
+ State#state{
+ last_change = os:timestamp(),
+ timer = new_timer(interval(State)),
+ ctx = Mod:cluster_unstable(Ctx)
+ }.
+
+
+-spec new_timer(non_neg_integer()) -> reference().
+new_timer(IntervalSec) ->
+ erlang:send_after(IntervalSec * 1000, self(), stability_check).
+
+
+% For the first Period seconds after node boot we check cluster stability every
+% StartPeriod seconds. Once the initial Period seconds have passed we continue
+% to monitor once every Period seconds
+-spec interval(#state{}) -> non_neg_integer().
+interval(#state{period = Period, start_period = StartPeriod,
+ start_time = T0}) ->
+ case now_diff_sec(T0) > Period of
+ true ->
+ % Normal operation
+ Period;
+ false ->
+ % During startup
+ StartPeriod
+ end.
+
+
+-spec now_diff_sec(erlang:timestamp()) -> non_neg_integer().
+now_diff_sec(Time) ->
+ case timer:now_diff(os:timestamp(), Time) of
+ USec when USec < 0 ->
+ 0;
+ USec when USec >= 0 ->
+ USec / 1000000
+ end.
diff --git a/src/mem3/src/mem3_httpd.erl b/src/mem3/src/mem3_httpd.erl
index 535815862..571f06370 100644
--- a/src/mem3/src/mem3_httpd.erl
+++ b/src/mem3/src/mem3_httpd.erl
@@ -32,7 +32,7 @@ handle_membership_req(#httpd{path_parts=[<<"_membership">>]}=Req) ->
handle_shards_req(#httpd{method='GET',
path_parts=[_DbName, <<"_shards">>]} = Req, Db) ->
- DbName = mem3:dbname(Db#db.name),
+ DbName = mem3:dbname(couch_db:name(Db)),
Shards = mem3:shards(DbName),
JsonShards = json_shards(Shards, dict:new()),
couch_httpd:send_json(Req, {[
@@ -40,7 +40,7 @@ handle_shards_req(#httpd{method='GET',
]});
handle_shards_req(#httpd{method='GET',
path_parts=[_DbName, <<"_shards">>, DocId]} = Req, Db) ->
- DbName = mem3:dbname(Db#db.name),
+ DbName = mem3:dbname(couch_db:name(Db)),
Shards = mem3:shards(DbName, DocId),
{[{Shard, Dbs}]} = json_shards(Shards, dict:new()),
couch_httpd:send_json(Req, {[
diff --git a/src/mem3/src/mem3_nodes.erl b/src/mem3/src/mem3_nodes.erl
index f31891a7b..555389b90 100644
--- a/src/mem3/src/mem3_nodes.erl
+++ b/src/mem3/src/mem3_nodes.erl
@@ -92,7 +92,7 @@ code_change(_OldVsn, #state{}=State, _Extra) ->
initialize_nodelist() ->
DbName = config:get("mem3", "nodes_db", "_nodes"),
{ok, Db} = mem3_util:ensure_exists(DbName),
- {ok, _, Db} = couch_btree:fold(Db#db.id_tree, fun first_fold/3, Db, []),
+ {ok, _} = couch_db:fold_docs(Db, fun first_fold/2, Db, []),
% add self if not already present
case ets:lookup(?MODULE, node()) of
[_] ->
@@ -103,13 +103,13 @@ initialize_nodelist() ->
{ok, _} = couch_db:update_doc(Db, Doc, [])
end,
couch_db:close(Db),
- Db#db.update_seq.
+ couch_db:get_update_seq(Db).
-first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, _, Acc) ->
+first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) ->
{ok, Acc};
-first_fold(#full_doc_info{deleted=true}, _, Acc) ->
+first_fold(#full_doc_info{deleted=true}, Acc) ->
{ok, Acc};
-first_fold(#full_doc_info{id=Id}=DocInfo, _, Db) ->
+first_fold(#full_doc_info{id=Id}=DocInfo, Db) ->
{ok, #doc{body={Props}}} = couch_db:open_doc(Db, DocInfo, [ejson_body]),
ets:insert(?MODULE, {mem3_util:to_atom(Id), Props}),
{ok, Db}.
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index 826604ab1..e178fad6d 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -170,16 +170,15 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
end.
-repl(#db{name=DbName, seq_tree=Bt}=Db, Acc0) ->
- erlang:put(io_priority, {internal_repl, DbName}),
+repl(Db, Acc0) ->
+ erlang:put(io_priority, {internal_repl, couch_db:name(Db)}),
#acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0#acc{source = Db}),
case Seq >= couch_db:get_update_seq(Db) of
true ->
{ok, 0};
false ->
Fun = fun ?MODULE:changes_enumerator/3,
- FoldOpts = [{start_key, Seq + 1}],
- {ok, _, Acc2} = couch_btree:fold(Bt, Fun, Acc1, FoldOpts),
+ {ok, _, Acc2} = couch_db:enum_docs_since(Db, Seq, Fun, Acc1, []),
{ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
{ok, couch_db:count_changes_since(Db, LastSeq)}
end.
@@ -343,7 +342,7 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) ->
SrcUUID = couch_db:get_uuid(SrcDb),
S = couch_util:encodeBase64Url(crypto:hash(md5, term_to_binary(SrcUUID))),
DocIdPrefix = <<"_local/shard-sync-", S/binary, "-">>,
- FoldFun = fun({DocId, {Rev0, {BodyProps}}}, _, _) ->
+ FoldFun = fun({DocId, {Rev0, {BodyProps}}}, _) ->
TgtUUID = couch_util:get_value(<<"target_uuid">>, BodyProps, <<>>),
case is_prefix(DocIdPrefix, DocId) of
true ->
@@ -360,10 +359,10 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) ->
end
end,
Options = [{start_key, DocIdPrefix}],
- case couch_btree:fold(SrcDb#db.local_tree, FoldFun, not_found, Options) of
- {ok, _, {TgtUUID, Doc}} ->
+ case couch_db:fold_local_docs(SrcDb, FoldFun, not_found, Options) of
+ {ok, {TgtUUID, Doc}} ->
{ok, TgtUUID, Doc};
- {ok, _, not_found} ->
+ {ok, not_found} ->
{not_found, missing};
Else ->
couch_log:error("Error finding replication doc: ~w", [Else]),
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
index 93cb99ac9..c2bd58fdf 100644
--- a/src/mem3/src/mem3_rpc.erl
+++ b/src/mem3/src/mem3_rpc.erl
@@ -84,11 +84,11 @@ load_checkpoint_rpc(DbName, SourceNode, SourceUUID) ->
save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
erlang:put(io_priority, {internal_repl, DbName}),
case get_or_create_db(DbName, [?ADMIN_CTX]) of
- {ok, #db{update_seq = TargetSeq} = Db} ->
+ {ok, Db} ->
NewEntry = {[
{<<"target_node">>, atom_to_binary(node(), utf8)},
{<<"target_uuid">>, couch_db:get_uuid(Db)},
- {<<"target_seq">>, TargetSeq}
+ {<<"target_seq">>, couch_db:get_update_seq(Db)}
] ++ NewEntry0},
Body = {[
{<<"seq">>, SourceSeq},
diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl
index 8d9cfb9c7..be7e5aaaf 100644
--- a/src/mem3/src/mem3_shards.erl
+++ b/src/mem3/src/mem3_shards.erl
@@ -323,7 +323,7 @@ get_update_seq() ->
DbName = config:get("mem3", "shards_db", "_dbs"),
{ok, Db} = mem3_util:ensure_exists(DbName),
couch_db:close(Db),
- Db#db.update_seq.
+ couch_db:get_update_seq(Db).
listen_for_changes(Since) ->
DbName = config:get("mem3", "shards_db", "_dbs"),
@@ -380,7 +380,7 @@ load_shards_from_disk(DbName) when is_binary(DbName) ->
couch_db:close(Db)
end.
-load_shards_from_db(#db{} = ShardDb, DbName) ->
+load_shards_from_db(ShardDb, DbName) ->
case couch_db:open_doc(ShardDb, DbName, [ejson_body]) of
{ok, #doc{body = {Props}}} ->
Seq = couch_db:get_update_seq(ShardDb),
@@ -659,7 +659,7 @@ t_spawn_writer_in_load_shards_from_db() ->
meck:expect(couch_db, get_update_seq, 1, 1),
meck:expect(mem3_util, build_ordered_shards, 2, mock_shards()),
erlang:register(?MODULE, self()), % register to get cache_insert cast
- load_shards_from_db(#db{name = <<"testdb">>}, ?DB),
+ load_shards_from_db(test_util:fake_db([{name, <<"testdb">>}]), ?DB),
meck:validate(couch_db),
meck:validate(mem3_util),
Cast = receive
@@ -746,8 +746,8 @@ mem3_shards_changes_test_() -> {
setup_changes() ->
- ok = meck:expect(mem3_util, ensure_exists, ['_'],
- {ok, #db{name = <<"dbs">>, update_seq = 0}}),
+ RespDb = test_util:fake_db([{name, <<"dbs">>}, {update_seq, 0}]),
+ ok = meck:expect(mem3_util, ensure_exists, ['_'], {ok, RespDb}),
ok = meck:expect(couch_db, close, ['_'], ok),
ok = application:start(config),
{ok, Pid} = ?MODULE:start_link(),
diff --git a/src/mem3/test/mem3_cluster_test.erl b/src/mem3/test/mem3_cluster_test.erl
new file mode 100644
index 000000000..4610d64bd
--- /dev/null
+++ b/src/mem3/test/mem3_cluster_test.erl
@@ -0,0 +1,133 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_cluster_test).
+
+-behavior(mem3_cluster).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-export([
+ cluster_unstable/1,
+ cluster_stable/1
+]).
+
+
+% Mem3 cluster callbacks
+
+cluster_unstable(Server) ->
+ Server ! cluster_unstable,
+ Server.
+
+cluster_stable(Server) ->
+ Server ! cluster_stable,
+ Server.
+
+
+mem3_cluster_test_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ t_cluster_stable_during_startup_period(),
+ t_cluster_unstable_delivered_on_nodeup(),
+ t_cluster_unstable_delivered_on_nodedown(),
+ t_wait_period_is_reset_after_last_change()
+ ]
+ }.
+
+
+t_cluster_stable_during_startup_period() ->
+ ?_test(begin
+ {ok, Pid} = mem3_cluster:start_link(?MODULE, self(), 1, 2),
+ register(?MODULE, Pid),
+ receive
+ cluster_stable ->
+ ?assert(true)
+ after 1500 ->
+ throw(timeout)
+ end,
+ unlink(Pid),
+ exit(Pid, kill)
+ end).
+
+
+t_cluster_unstable_delivered_on_nodeup() ->
+ ?_test(begin
+ {ok, Pid} = mem3_cluster:start_link(?MODULE, self(), 1, 2),
+ register(?MODULE, Pid),
+ Pid ! {nodeup, node()},
+ receive
+ cluster_unstable ->
+ ?assert(true)
+ after 1000 ->
+ throw(timeout)
+ end,
+ unlink(Pid),
+ exit(Pid, kill)
+ end).
+
+
+t_cluster_unstable_delivered_on_nodedown() ->
+ ?_test(begin
+ {ok, Pid} = mem3_cluster:start_link(?MODULE, self(), 1, 2),
+ register(?MODULE, Pid),
+ Pid ! {nodedown, node()},
+ receive
+ cluster_unstable ->
+ ?assert(true)
+ after 1000 ->
+ throw(timeout)
+ end,
+ unlink(Pid),
+ exit(Pid, kill)
+ end).
+
+
+t_wait_period_is_reset_after_last_change() ->
+ ?_test(begin
+ {ok, Pid} = mem3_cluster:start_link(?MODULE, self(), 1, 1),
+ register(?MODULE, Pid),
+ timer:sleep(800),
+ Pid ! {nodeup, node()}, % after 800 sec send a nodeup
+ receive
+ cluster_stable ->
+ ?assert(false)
+ after 400 ->
+ ?assert(true) % stability check should have been reset
+ end,
+ timer:sleep(1000),
+ receive
+ cluster_stable ->
+ ?assert(true)
+ after 0 ->
+ ?assert(false) % cluster_stable arrives after enough quiet time
+ end,
+ unlink(Pid),
+ exit(Pid, kill)
+ end).
+
+
+% Test helper functions
+
+setup() ->
+ ok.
+
+teardown(_) ->
+ case whereis(?MODULE) of
+ undefined ->
+ ok;
+ Pid when is_pid(Pid) ->
+ unlink(Pid),
+ exit(Pid, kill)
+ end.
diff --git a/src/rexi/src/rexi_server_mon.erl b/src/rexi/src/rexi_server_mon.erl
index e6b5eb98e..86fecaff6 100644
--- a/src/rexi/src/rexi_server_mon.erl
+++ b/src/rexi/src/rexi_server_mon.erl
@@ -14,6 +14,7 @@
-module(rexi_server_mon).
-behaviour(gen_server).
+-behaviour(mem3_cluster).
-vsn(1).
@@ -32,8 +33,13 @@
code_change/3
]).
+-export([
+ cluster_stable/1,
+ cluster_unstable/1
+]).
--define(INTERVAL, 60000).
+
+-define(CLUSTER_STABILITY_PERIOD_SEC, 15).
start_link(ChildMod) ->
@@ -45,9 +51,23 @@ status() ->
gen_server:call(?MODULE, status).
+% Mem3 cluster callbacks
+
+cluster_unstable(Server) ->
+ couch_log:notice("~s : cluster unstable", [?MODULE]),
+ gen_server:cast(Server, cluster_unstable),
+ Server.
+
+cluster_stable(Server) ->
+ gen_server:cast(Server, cluster_stable),
+ Server.
+
+
+% gen_server callbacks
+
init(ChildMod) ->
- net_kernel:monitor_nodes(true),
- erlang:send(self(), check_nodes),
+ {ok, _Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(),
+ ?CLUSTER_STABILITY_PERIOD_SEC, ?CLUSTER_STABILITY_PERIOD_SEC),
{ok, ChildMod}.
@@ -67,24 +87,27 @@ handle_call(Msg, _From, St) ->
couch_log:notice("~s ignored_call ~w", [?MODULE, Msg]),
{reply, ignored, St}.
-
-handle_cast(Msg, St) ->
- couch_log:notice("~s ignored_cast ~w", [?MODULE, Msg]),
- {noreply, St}.
-
-
-handle_info({nodeup, _}, ChildMod) ->
+% If cluster is unstable a node was added or just removed. Check if any nodes
+% can be started, but do not immediately stop nodes, defer that till cluster
+% stabilized.
+handle_cast(cluster_unstable, ChildMod) ->
+ couch_log:notice("~s : cluster unstable", [ChildMod]),
start_servers(ChildMod),
{noreply, ChildMod};
-handle_info({nodedown, _}, St) ->
- {noreply, St};
-
-handle_info(check_nodes, ChildMod) ->
+% When cluster is stable, start any servers for new nodes and stop servers for
+% the ones that disconnected.
+handle_cast(cluster_stable, ChildMod) ->
+ couch_log:notice("~s : cluster stable", [ChildMod]),
start_servers(ChildMod),
- erlang:send_after(?INTERVAL, self(), check_nodes),
+ stop_servers(ChildMod),
{noreply, ChildMod};
+handle_cast(Msg, St) ->
+ couch_log:notice("~s ignored_cast ~w", [?MODULE, Msg]),
+ {noreply, St}.
+
+
handle_info(Msg, St) ->
couch_log:notice("~s ignored_info ~w", [?MODULE, Msg]),
{noreply, St}.
@@ -101,13 +124,27 @@ start_servers(ChildMod) ->
{ok, _} = start_server(ChildMod, Id)
end, missing_servers(ChildMod)).
+stop_servers(ChildMod) ->
+ lists:foreach(fun(Id) ->
+ ok = stop_server(ChildMod, Id)
+ end, extra_servers(ChildMod)).
+
+
+server_ids(ChildMod) ->
+ Nodes = [node() | nodes()],
+ [list_to_atom(lists:concat([ChildMod, "_", Node])) || Node <- Nodes].
+
+
+running_servers(ChildMod) ->
+ [Id || {Id, _, _, _} <- supervisor:which_children(sup_module(ChildMod))].
+
missing_servers(ChildMod) ->
- ServerIds = [list_to_atom(lists:concat([ChildMod, "_", Node]))
- || Node <- [node() | nodes()]],
- SupModule = sup_module(ChildMod),
- ChildIds = [Id || {Id, _, _, _} <- supervisor:which_children(SupModule)],
- ServerIds -- ChildIds.
+ server_ids(ChildMod) -- running_servers(ChildMod).
+
+
+extra_servers(ChildMod) ->
+ running_servers(ChildMod) -- server_ids(ChildMod).
start_server(ChildMod, ChildId) ->
@@ -126,5 +163,12 @@ start_server(ChildMod, ChildId) ->
erlang:error(Else)
end.
+
+stop_server(ChildMod, ChildId) ->
+ SupMod = sup_module(ChildMod),
+ ok = supervisor:terminate_child(SupMod, ChildId),
+ ok = supervisor:delete_child(SupMod, ChildId).
+
+
sup_module(ChildMod) ->
list_to_atom(lists:concat([ChildMod, "_sup"])).