summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan Lehnardt <jan@apache.org>2017-04-01 19:38:53 +0200
committerJan Lehnardt <jan@apache.org>2017-04-01 19:38:53 +0200
commit81ad7005d01fbc03bd7573aaa97f0b44c137e6c3 (patch)
tree67a5072bb6df37cd545b40955b40bfca03e6edf7
parent8f39d4191040e7519a573ce1b3cdf5b674bf8e4f (diff)
parentc3c5429180de14a2b139f7741c934143ef73988c (diff)
downloadcouchdb-81ad7005d01fbc03bd7573aaa97f0b44c137e6c3.tar.gz
Add 'src/mem3/' from commit 'c3c5429180de14a2b139f7741c934143ef73988c'
git-subtree-dir: src/mem3 git-subtree-mainline: 8f39d4191040e7519a573ce1b3cdf5b674bf8e4f git-subtree-split: c3c5429180de14a2b139f7741c934143ef73988c
-rw-r--r--src/mem3/LICENSE202
-rw-r--r--src/mem3/README.md43
-rw-r--r--src/mem3/include/mem3.hrl52
-rw-r--r--src/mem3/priv/stats_descriptions.cfg12
-rw-r--r--src/mem3/src/mem3.app.src53
-rw-r--r--src/mem3/src/mem3.erl308
-rw-r--r--src/mem3/src/mem3_app.erl21
-rw-r--r--src/mem3/src/mem3_epi.erl50
-rw-r--r--src/mem3/src/mem3_httpd.erl66
-rw-r--r--src/mem3/src/mem3_httpd_handlers.erl23
-rw-r--r--src/mem3/src/mem3_nodes.erl146
-rw-r--r--src/mem3/src/mem3_rep.erl481
-rw-r--r--src/mem3/src/mem3_rpc.erl586
-rw-r--r--src/mem3/src/mem3_shards.erl419
-rw-r--r--src/mem3/src/mem3_sup.erl35
-rw-r--r--src/mem3/src/mem3_sync.erl319
-rw-r--r--src/mem3/src/mem3_sync_event.erl86
-rw-r--r--src/mem3/src/mem3_sync_event_listener.erl309
-rw-r--r--src/mem3/src/mem3_sync_nodes.erl115
-rw-r--r--src/mem3/src/mem3_sync_security.erl107
-rw-r--r--src/mem3/src/mem3_util.erl253
-rw-r--r--src/mem3/test/01-config-default.ini14
-rw-r--r--src/mem3/test/mem3_util_test.erl167
23 files changed, 3867 insertions, 0 deletions
diff --git a/src/mem3/LICENSE b/src/mem3/LICENSE
new file mode 100644
index 000000000..f6cd2bc80
--- /dev/null
+++ b/src/mem3/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/src/mem3/README.md b/src/mem3/README.md
new file mode 100644
index 000000000..1e1e0bd2c
--- /dev/null
+++ b/src/mem3/README.md
@@ -0,0 +1,43 @@
+## mem3
+
+Mem3 is the node membership application for clustered [CouchDB][1]. It is used
+in CouchDB since version 2.0 and tracks two very important things for the
+cluster:
+
+ 1. member nodes
+ 2. node/shards mappings for each database
+
+Both the nodes and shards are tracked in node-local couch databases. Shards
+are heavily used, so an ETS cache is also maintained for low-latency lookups.
+The nodes and shards are synchronized via continuous CouchDB replication,
+which serves as 'gossip' in Dynamo parlance. The shards ETS cache is kept in
+sync based on membership and database event listeners.
+
+A very important point to make here is that CouchDB does not necessarily
+divide up each database into equal shards across the nodes of a cluster. For
+instance, in a 20-node cluster, you may have the need to create a small
+database with very few documents. For efficiency reasons, you may create your
+database with Q=4 and keep the default of N=3. This means you only have 12
+shards total, so 8 nodes will hold none of the data for this database. Given
+this feature, we even shard use out across the cluster by altering the 'start'
+node for the database's shards.
+
+Splitting and merging shards is an immature feature of the system, and will
+require attention in the near-term. We believe we can implement both
+functions and perform them while the database remains online.
+
+### Getting Started
+
+Mem3 requires R13B03 or higher and can be built with [rebar][2], which comes
+bundled in the repository. Rebar needs to be able to find the `couch_db.hrl`
+header file; one way to accomplish this is to set ERL_LIBS to point to the
+apps subdirectory of a CouchDB checkout, e.g.
+
+ ERL_LIBS="/usr/local/src/couchdb/apps" ./rebar compile
+
+### License
+[Apache 2.0][3]
+
+[1]: http://couchdb.apache.org
+[2]: http://github.com/rebar/rebar
+[3]: http://www.apache.org/licenses/LICENSE-2.0.html
diff --git a/src/mem3/include/mem3.hrl b/src/mem3/include/mem3.hrl
new file mode 100644
index 000000000..d6ac0bed2
--- /dev/null
+++ b/src/mem3/include/mem3.hrl
@@ -0,0 +1,52 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% type specification hacked to suppress dialyzer warning re: match spec
+-record(shard, {
+ name :: binary() | '_',
+ node :: node() | '_',
+ dbname :: binary(),
+ range :: [non_neg_integer() | '$1' | '$2'] | '_',
+ ref :: reference() | 'undefined' | '_'
+}).
+
+%% Do not reference outside of mem3.
+-record(ordered_shard, {
+ name :: binary() | '_',
+ node :: node() | '_',
+ dbname :: binary(),
+ range :: [non_neg_integer() | '$1' | '$2'] | '_',
+ ref :: reference() | 'undefined' | '_',
+ order :: non_neg_integer() | 'undefined' | '_'
+}).
+
+%% types
+-type join_type() :: init | join | replace | leave.
+-type join_order() :: non_neg_integer().
+-type options() :: list().
+-type mem_node() :: {join_order(), node(), options()}.
+-type mem_node_list() :: [mem_node()].
+-type arg_options() :: {test, boolean()}.
+-type args() :: [] | [arg_options()].
+-type test() :: undefined | node().
+-type epoch() :: float().
+-type clock() :: {node(), epoch()}.
+-type vector_clock() :: [clock()].
+-type ping_node() :: node() | nil.
+-type gossip_fun() :: call | cast.
+
+-type part() :: #shard{}.
+-type fullmap() :: [part()].
+-type ref_part_map() :: {reference(), part()}.
+-type tref() :: reference().
+-type np() :: {node(), part()}.
+-type beg_acc() :: [integer()].
diff --git a/src/mem3/priv/stats_descriptions.cfg b/src/mem3/priv/stats_descriptions.cfg
new file mode 100644
index 000000000..569d16ac3
--- /dev/null
+++ b/src/mem3/priv/stats_descriptions.cfg
@@ -0,0 +1,12 @@
+{[mem3, shard_cache, eviction], [
+ {type, counter},
+ {desc, <<"number of shard cache evictions">>}
+]}.
+{[mem3, shard_cache, hit], [
+ {type, counter},
+ {desc, <<"number of shard cache hits">>}
+]}.
+{[mem3, shard_cache, miss], [
+ {type, counter},
+ {desc, <<"number of shard cache misses">>}
+]}.
diff --git a/src/mem3/src/mem3.app.src b/src/mem3/src/mem3.app.src
new file mode 100644
index 000000000..99a9eed88
--- /dev/null
+++ b/src/mem3/src/mem3.app.src
@@ -0,0 +1,53 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, mem3, [
+ {description, "CouchDB Cluster Membership"},
+ {vsn, git},
+ {modules, [
+ mem3,
+ mem3_app,
+ mem3_httpd,
+ mem3_nodes,
+ mem3_rep,
+ mem3_shards,
+ mem3_sup,
+ mem3_sync,
+ mem3_sync_event,
+ mem3_sync_nodes,
+ mem3_sync_security,
+ mem3_util
+ ]},
+ {mod, {mem3_app, []}},
+ {registered, [
+ mem3_events,
+ mem3_nodes,
+ mem3_shards,
+ mem3_sync,
+ mem3_sync_nodes,
+ mem3_sup
+ ]},
+ {applications, [
+ kernel,
+ stdlib,
+ config,
+ sasl,
+ crypto,
+ mochiweb,
+ couch_epi,
+ couch,
+ rexi,
+ couch_log,
+ couch_event,
+ couch_stats
+ ]}
+]}.
diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl
new file mode 100644
index 000000000..405d7e5fa
--- /dev/null
+++ b/src/mem3/src/mem3.erl
@@ -0,0 +1,308 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3).
+
+-export([start/0, stop/0, restart/0, nodes/0, node_info/2, shards/1, shards/2,
+ choose_shards/2, n/1, n/2, dbname/1, ushards/1]).
+-export([get_shard/3, local_shards/1, shard_suffix/1, fold_shards/2]).
+-export([sync_security/0, sync_security/1]).
+-export([compare_nodelists/0, compare_shards/1]).
+-export([quorum/1, group_by_proximity/1]).
+-export([live_shards/2]).
+-export([belongs/2]).
+-export([get_placement/1]).
+
+%% For mem3 use only.
+-export([name/1, node/1, range/1]).
+
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+start() ->
+ application:start(mem3).
+
+stop() ->
+ application:stop(mem3).
+
+restart() ->
+ stop(),
+ start().
+
+%% @doc Detailed report of cluster-wide membership state. Queries the state
+%% on all member nodes and builds a dictionary with unique states as the
+%% key and the nodes holding that state as the value. Also reports member
+%% nodes which fail to respond and nodes which are connected but are not
+%% cluster members. Useful for debugging.
+-spec compare_nodelists() -> [{{cluster_nodes, [node()]} | bad_nodes
+ | non_member_nodes, [node()]}].
+compare_nodelists() ->
+ Nodes = mem3:nodes(),
+ AllNodes = erlang:nodes([this, visible]),
+ {Replies, BadNodes} = gen_server:multi_call(Nodes, mem3_nodes, get_nodelist),
+ Dict = lists:foldl(fun({Node, Nodelist}, D) ->
+ orddict:append({cluster_nodes, Nodelist}, Node, D)
+ end, orddict:new(), Replies),
+ [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict].
+
+-spec compare_shards(DbName::iodata()) -> [{bad_nodes | [#shard{}], [node()]}].
+compare_shards(DbName) when is_list(DbName) ->
+ compare_shards(list_to_binary(DbName));
+compare_shards(DbName) ->
+ Nodes = mem3:nodes(),
+ {Replies, BadNodes} = rpc:multicall(mem3, shards, [DbName]),
+ GoodNodes = [N || N <- Nodes, not lists:member(N, BadNodes)],
+ Dict = lists:foldl(fun({Shards, Node}, D) ->
+ orddict:append(Shards, Node, D)
+ end, orddict:new(), lists:zip(Replies, GoodNodes)),
+ [{bad_nodes, BadNodes} | Dict].
+
+-spec n(DbName::iodata()) -> integer().
+n(DbName) ->
+ n(DbName, <<"foo">>).
+
+n(DbName, DocId) ->
+ length(mem3:shards(DbName, DocId)).
+
+-spec nodes() -> [node()].
+nodes() ->
+ mem3_nodes:get_nodelist().
+
+node_info(Node, Key) ->
+ mem3_nodes:get_node_info(Node, Key).
+
+-spec shards(DbName::iodata()) -> [#shard{}].
+shards(DbName) ->
+ shards_int(DbName, []).
+
+shards_int(DbName, Options) when is_list(DbName) ->
+ shards_int(list_to_binary(DbName), Options);
+shards_int(DbName, Options) ->
+ Ordered = lists:member(ordered, Options),
+ ShardDbName =
+ list_to_binary(config:get("mem3", "shards_db", "_dbs")),
+ case DbName of
+ ShardDbName when Ordered ->
+ %% shard_db is treated as a single sharded db to support calls to db_info
+ %% and view_all_docs
+ [#ordered_shard{
+ node = node(),
+ name = ShardDbName,
+ dbname = ShardDbName,
+ range = [0, (2 bsl 31)-1],
+ order = undefined}];
+ ShardDbName ->
+ %% shard_db is treated as a single sharded db to support calls to db_info
+ %% and view_all_docs
+ [#shard{
+ node = node(),
+ name = ShardDbName,
+ dbname = ShardDbName,
+ range = [0, (2 bsl 31)-1]}];
+ _ ->
+ mem3_shards:for_db(DbName, Options)
+ end.
+
+-spec shards(DbName::iodata(), DocId::binary()) -> [#shard{}].
+shards(DbName, DocId) ->
+ shards_int(DbName, DocId, []).
+
+shards_int(DbName, DocId, Options) when is_list(DbName) ->
+ shards_int(list_to_binary(DbName), DocId, Options);
+shards_int(DbName, DocId, Options) when is_list(DocId) ->
+ shards_int(DbName, list_to_binary(DocId), Options);
+shards_int(DbName, DocId, Options) ->
+ mem3_shards:for_docid(DbName, DocId, Options).
+
+
+-spec ushards(DbName::iodata()) -> [#shard{}].
+ushards(DbName) ->
+ Nodes = [node()|erlang:nodes()],
+ ZoneMap = zone_map(Nodes),
+ Shards = ushards(DbName, live_shards(DbName, Nodes, [ordered]), ZoneMap),
+ mem3_util:downcast(Shards).
+
+ushards(DbName, Shards0, ZoneMap) ->
+ {L,S,D} = group_by_proximity(Shards0, ZoneMap),
+ % Prefer shards in the local zone over shards in a different zone,
+ % but sort each zone separately to ensure a consistent choice between
+ % nodes in the same zone.
+ Shards = choose_ushards(DbName, L ++ S) ++ choose_ushards(DbName, D),
+ lists:ukeysort(#shard.range, Shards).
+
+get_shard(DbName, Node, Range) ->
+ mem3_shards:get(DbName, Node, Range).
+
+local_shards(DbName) ->
+ mem3_shards:local(DbName).
+
+shard_suffix(#db{name=DbName}) ->
+ shard_suffix(DbName);
+shard_suffix(DbName0) ->
+ Shard = hd(shards(DbName0)),
+ <<"shards/", _:8/binary, "-", _:8/binary, "/", DbName/binary>> =
+ Shard#shard.name,
+ filename:extension(binary_to_list(DbName)).
+
+fold_shards(Fun, Acc) ->
+ mem3_shards:fold(Fun, Acc).
+
+sync_security() ->
+ mem3_sync_security:go().
+
+sync_security(Db) ->
+ mem3_sync_security:go(dbname(Db)).
+
+-spec choose_shards(DbName::iodata(), Options::list()) -> [#shard{}].
+choose_shards(DbName, Options) when is_list(DbName) ->
+ choose_shards(list_to_binary(DbName), Options);
+choose_shards(DbName, Options) ->
+ try shards(DbName)
+ catch error:E when E==database_does_not_exist; E==badarg ->
+ Nodes = allowed_nodes(),
+ case get_placement(Options) of
+ undefined ->
+ choose_shards(DbName, Nodes, Options);
+ Placement ->
+ lists:flatmap(fun({Zone, N}) ->
+ NodesInZone = nodes_in_zone(Nodes, Zone),
+ Options1 = lists:keymerge(1, [{n,N}], Options),
+ choose_shards(DbName, NodesInZone, Options1)
+ end, Placement)
+ end
+ end.
+
+choose_shards(DbName, Nodes, Options) ->
+ NodeCount = length(Nodes),
+ Suffix = couch_util:get_value(shard_suffix, Options, ""),
+ N = mem3_util:n_val(couch_util:get_value(n, Options), NodeCount),
+ if N =:= 0 -> erlang:error(no_nodes_in_zone);
+ true -> ok
+ end,
+ Q = mem3_util:to_integer(couch_util:get_value(q, Options,
+ config:get("cluster", "q", "8"))),
+ %% rotate to a random entry in the nodelist for even distribution
+ {A, B} = lists:split(crypto:rand_uniform(1,length(Nodes)+1), Nodes),
+ RotatedNodes = B ++ A,
+ mem3_util:create_partition_map(DbName, N, Q, RotatedNodes, Suffix).
+
+get_placement(Options) ->
+ case couch_util:get_value(placement, Options) of
+ undefined ->
+ case config:get("cluster", "placement") of
+ undefined ->
+ undefined;
+ PlacementStr ->
+ decode_placement_string(PlacementStr)
+ end;
+ PlacementStr ->
+ decode_placement_string(PlacementStr)
+ end.
+
+decode_placement_string(PlacementStr) ->
+ [begin
+ [Zone, N] = string:tokens(Rule, ":"),
+ {list_to_binary(Zone), list_to_integer(N)}
+ end || Rule <- string:tokens(PlacementStr, ",")].
+
+-spec dbname(#shard{} | iodata()) -> binary().
+dbname(#shard{dbname = DbName}) ->
+ DbName;
+dbname(<<"shards/", _:8/binary, "-", _:8/binary, "/", DbName/binary>>) ->
+ list_to_binary(filename:rootname(binary_to_list(DbName)));
+dbname(DbName) when is_list(DbName) ->
+ dbname(list_to_binary(DbName));
+dbname(DbName) when is_binary(DbName) ->
+ DbName;
+dbname(_) ->
+ erlang:error(badarg).
+
+%% @doc Determine if DocId belongs in shard (identified by record or filename)
+belongs(#shard{}=Shard, DocId) when is_binary(DocId) ->
+ [Begin, End] = range(Shard),
+ belongs(Begin, End, DocId);
+belongs(<<"shards/", _/binary>> = ShardName, DocId) when is_binary(DocId) ->
+ [Begin, End] = range(ShardName),
+ belongs(Begin, End, DocId);
+belongs(DbName, DocId) when is_binary(DbName), is_binary(DocId) ->
+ true.
+
+belongs(Begin, End, DocId) ->
+ HashKey = mem3_util:hash(DocId),
+ Begin =< HashKey andalso HashKey =< End.
+
+range(#shard{range = Range}) ->
+ Range;
+range(#ordered_shard{range = Range}) ->
+ Range;
+range(<<"shards/", Start:8/binary, "-", End:8/binary, "/", _/binary>>) ->
+ [httpd_util:hexlist_to_integer(binary_to_list(Start)),
+ httpd_util:hexlist_to_integer(binary_to_list(End))].
+
+allowed_nodes() ->
+ [Node || Node <- mem3:nodes(), mem3:node_info(Node, <<"decom">>) =/= true].
+
+nodes_in_zone(Nodes, Zone) ->
+ [Node || Node <- Nodes, Zone == mem3:node_info(Node, <<"zone">>)].
+
+live_shards(DbName, Nodes) ->
+ live_shards(DbName, Nodes, []).
+
+live_shards(DbName, Nodes, Options) ->
+ [S || S <- shards_int(DbName, Options), lists:member(mem3:node(S), Nodes)].
+
+zone_map(Nodes) ->
+ [{Node, node_info(Node, <<"zone">>)} || Node <- Nodes].
+
+group_by_proximity(Shards) ->
+ Nodes = [mem3:node(S) || S <- lists:ukeysort(#shard.node, Shards)],
+ group_by_proximity(Shards, zone_map(Nodes)).
+
+group_by_proximity(Shards, ZoneMap) ->
+ {Local, Remote} = lists:partition(fun(S) -> mem3:node(S) =:= node() end,
+ Shards),
+ LocalZone = proplists:get_value(node(), ZoneMap),
+ Fun = fun(S) -> proplists:get_value(mem3:node(S), ZoneMap) =:= LocalZone end,
+ {SameZone, DifferentZone} = lists:partition(Fun, Remote),
+ {Local, SameZone, DifferentZone}.
+
+choose_ushards(DbName, Shards) ->
+ Groups0 = group_by_range(Shards),
+ Groups1 = [mem3_util:rotate_list({DbName, R}, order_shards(G))
+ || {R, G} <- Groups0],
+ [hd(G) || G <- Groups1].
+
+order_shards([#ordered_shard{}|_]=OrderedShards) ->
+ lists:keysort(#ordered_shard.order, OrderedShards);
+order_shards(UnorderedShards) ->
+ UnorderedShards.
+
+group_by_range(Shards) ->
+ lists:foldl(fun(Shard, Dict) ->
+ orddict:append(mem3:range(Shard), Shard, Dict) end, orddict:new(), Shards).
+
+% quorum functions
+
+quorum(#db{name=DbName}) ->
+ quorum(DbName);
+quorum(DbName) ->
+ n(DbName) div 2 + 1.
+
+node(#shard{node=Node}) ->
+ Node;
+node(#ordered_shard{node=Node}) ->
+ Node.
+
+name(#shard{name=Name}) ->
+ Name;
+name(#ordered_shard{name=Name}) ->
+ Name.
diff --git a/src/mem3/src/mem3_app.erl b/src/mem3/src/mem3_app.erl
new file mode 100644
index 000000000..3ddfbe6fd
--- /dev/null
+++ b/src/mem3/src/mem3_app.erl
@@ -0,0 +1,21 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_app).
+-behaviour(application).
+-export([start/2, stop/1]).
+
+start(_Type, []) ->
+ mem3_sup:start_link().
+
+stop([]) ->
+ ok.
diff --git a/src/mem3/src/mem3_epi.erl b/src/mem3/src/mem3_epi.erl
new file mode 100644
index 000000000..ebcd596b6
--- /dev/null
+++ b/src/mem3/src/mem3_epi.erl
@@ -0,0 +1,50 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(mem3_epi).
+
+-behaviour(couch_epi_plugin).
+
+-export([
+ app/0,
+ providers/0,
+ services/0,
+ data_subscriptions/0,
+ data_providers/0,
+ processes/0,
+ notify/3
+]).
+
+app() ->
+ mem3.
+
+providers() ->
+ [
+ {chttpd_handlers, mem3_httpd_handlers}
+ ].
+
+
+services() ->
+ [].
+
+data_subscriptions() ->
+ [].
+
+data_providers() ->
+ [].
+
+processes() ->
+ [].
+
+notify(_Key, _Old, _New) ->
+ ok.
diff --git a/src/mem3/src/mem3_httpd.erl b/src/mem3/src/mem3_httpd.erl
new file mode 100644
index 000000000..535815862
--- /dev/null
+++ b/src/mem3/src/mem3_httpd.erl
@@ -0,0 +1,66 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_httpd).
+
+-export([handle_membership_req/1, handle_shards_req/2]).
+
+%% includes
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+handle_membership_req(#httpd{method='GET',
+ path_parts=[<<"_membership">>]} = Req) ->
+ ClusterNodes = try mem3:nodes()
+ catch _:_ -> {ok,[]} end,
+ couch_httpd:send_json(Req, {[
+ {all_nodes, lists:sort([node()|nodes()])},
+ {cluster_nodes, lists:sort(ClusterNodes)}
+ ]});
+handle_membership_req(#httpd{path_parts=[<<"_membership">>]}=Req) ->
+ chttpd:send_method_not_allowed(Req, "GET").
+
+handle_shards_req(#httpd{method='GET',
+ path_parts=[_DbName, <<"_shards">>]} = Req, Db) ->
+ DbName = mem3:dbname(Db#db.name),
+ Shards = mem3:shards(DbName),
+ JsonShards = json_shards(Shards, dict:new()),
+ couch_httpd:send_json(Req, {[
+ {shards, JsonShards}
+ ]});
+handle_shards_req(#httpd{method='GET',
+ path_parts=[_DbName, <<"_shards">>, DocId]} = Req, Db) ->
+ DbName = mem3:dbname(Db#db.name),
+ Shards = mem3:shards(DbName, DocId),
+ {[{Shard, Dbs}]} = json_shards(Shards, dict:new()),
+ couch_httpd:send_json(Req, {[
+ {range, Shard},
+ {nodes, Dbs}
+ ]});
+handle_shards_req(#httpd{path_parts=[_DbName, <<"_shards">>]}=Req, _Db) ->
+ chttpd:send_method_not_allowed(Req, "GET");
+handle_shards_req(#httpd{path_parts=[_DbName, <<"_shards">>, _DocId]}=Req, _Db) ->
+ chttpd:send_method_not_allowed(Req, "GET").
+
+%%
+%% internal
+%%
+
+json_shards([], AccIn) ->
+ List = dict:to_list(AccIn),
+ {lists:sort(List)};
+json_shards([#shard{node=Node, range=[B,E]} | Rest], AccIn) ->
+ HexBeg = couch_util:to_hex(<<B:32/integer>>),
+ HexEnd = couch_util:to_hex(<<E:32/integer>>),
+ Range = list_to_binary(HexBeg ++ "-" ++ HexEnd),
+ json_shards(Rest, dict:append(Range, Node, AccIn)).
diff --git a/src/mem3/src/mem3_httpd_handlers.erl b/src/mem3/src/mem3_httpd_handlers.erl
new file mode 100644
index 000000000..d8e138c15
--- /dev/null
+++ b/src/mem3/src/mem3_httpd_handlers.erl
@@ -0,0 +1,23 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_httpd_handlers).
+
+-export([url_handler/1, db_handler/1, design_handler/1]).
+
+url_handler(<<"_membership">>) -> fun mem3_httpd:handle_membership_req/1;
+url_handler(_) -> no_match.
+
+db_handler(<<"_shards">>) -> fun mem3_httpd:handle_shards_req/2;
+db_handler(_) -> no_match.
+
+design_handler(_) -> no_match.
diff --git a/src/mem3/src/mem3_nodes.erl b/src/mem3/src/mem3_nodes.erl
new file mode 100644
index 000000000..f31891a7b
--- /dev/null
+++ b/src/mem3/src/mem3_nodes.erl
@@ -0,0 +1,146 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_nodes).
+-behaviour(gen_server).
+-vsn(1).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0, get_nodelist/0, get_node_info/2]).
+
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-record(state, {changes_pid, update_seq}).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+get_nodelist() ->
+ try
+ lists:sort([N || {N,_} <- ets:tab2list(?MODULE)])
+ catch error:badarg ->
+ gen_server:call(?MODULE, get_nodelist)
+ end.
+
+get_node_info(Node, Key) ->
+ try
+ couch_util:get_value(Key, ets:lookup_element(?MODULE, Node, 2))
+ catch error:badarg ->
+ gen_server:call(?MODULE, {get_node_info, Node, Key})
+ end.
+
+init([]) ->
+ ets:new(?MODULE, [named_table, {read_concurrency, true}]),
+ UpdateSeq = initialize_nodelist(),
+ {Pid, _} = spawn_monitor(fun() -> listen_for_changes(UpdateSeq) end),
+ {ok, #state{changes_pid = Pid, update_seq = UpdateSeq}}.
+
+handle_call(get_nodelist, _From, State) ->
+ {reply, lists:sort([N || {N,_} <- ets:tab2list(?MODULE)]), State};
+handle_call({get_node_info, Node, Key}, _From, State) ->
+ Resp = try
+ couch_util:get_value(Key, ets:lookup_element(?MODULE, Node, 2))
+ catch error:badarg ->
+ error
+ end,
+ {reply, Resp, State};
+handle_call({add_node, Node, NodeInfo}, _From, State) ->
+ gen_event:notify(mem3_events, {add_node, Node}),
+ ets:insert(?MODULE, {Node, NodeInfo}),
+ {reply, ok, State};
+handle_call({remove_node, Node}, _From, State) ->
+ gen_event:notify(mem3_events, {remove_node, Node}),
+ ets:delete(?MODULE, Node),
+ {reply, ok, State};
+handle_call(_Call, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) ->
+ couch_log:notice("~p changes listener died ~p", [?MODULE, Reason]),
+ StartSeq = State#state.update_seq,
+ Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> StartSeq end,
+ erlang:send_after(5000, self(), start_listener),
+ {noreply, State#state{update_seq = Seq}};
+handle_info(start_listener, #state{update_seq = Seq} = State) ->
+ {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
+ {noreply, State#state{changes_pid=NewPid}};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, #state{}=State, _Extra) ->
+ {ok, State}.
+
+%% internal functions
+
+initialize_nodelist() ->
+ DbName = config:get("mem3", "nodes_db", "_nodes"),
+ {ok, Db} = mem3_util:ensure_exists(DbName),
+ {ok, _, Db} = couch_btree:fold(Db#db.id_tree, fun first_fold/3, Db, []),
+ % add self if not already present
+ case ets:lookup(?MODULE, node()) of
+ [_] ->
+ ok;
+ [] ->
+ ets:insert(?MODULE, {node(), []}),
+ Doc = #doc{id = couch_util:to_binary(node())},
+ {ok, _} = couch_db:update_doc(Db, Doc, [])
+ end,
+ couch_db:close(Db),
+ Db#db.update_seq.
+
+first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, _, Acc) ->
+ {ok, Acc};
+first_fold(#full_doc_info{deleted=true}, _, Acc) ->
+ {ok, Acc};
+first_fold(#full_doc_info{id=Id}=DocInfo, _, Db) ->
+ {ok, #doc{body={Props}}} = couch_db:open_doc(Db, DocInfo, [ejson_body]),
+ ets:insert(?MODULE, {mem3_util:to_atom(Id), Props}),
+ {ok, Db}.
+
+listen_for_changes(Since) ->
+ DbName = config:get("mem3", "nodes_db", "_nodes"),
+ {ok, Db} = mem3_util:ensure_exists(DbName),
+ Args = #changes_args{
+ feed = "continuous",
+ since = Since,
+ heartbeat = true,
+ include_docs = true
+ },
+ ChangesFun = couch_changes:handle_db_changes(Args, nil, Db),
+ ChangesFun(fun changes_callback/2).
+
+changes_callback(start, _) ->
+ {ok, nil};
+changes_callback({stop, EndSeq}, _) ->
+ exit({seq, EndSeq});
+changes_callback({change, {Change}, _}, _) ->
+ Node = couch_util:get_value(<<"id">>, Change),
+ case Node of <<"_design/", _/binary>> -> ok; _ ->
+ case mem3_util:is_deleted(Change) of
+ false ->
+ {Props} = couch_util:get_value(doc, Change),
+ gen_server:call(?MODULE, {add_node, mem3_util:to_atom(Node), Props});
+ true ->
+ gen_server:call(?MODULE, {remove_node, mem3_util:to_atom(Node)})
+ end
+ end,
+ {ok, couch_util:get_value(<<"seq">>, Change)};
+changes_callback(timeout, _) ->
+ {ok, nil}.
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
new file mode 100644
index 000000000..ad7ac55f5
--- /dev/null
+++ b/src/mem3/src/mem3_rep.erl
@@ -0,0 +1,481 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_rep).
+
+
+-export([
+ go/2,
+ go/3,
+ make_local_id/2,
+ find_source_seq/4
+]).
+
+-export([
+ changes_enumerator/3
+]).
+
+
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-record(acc, {
+ batch_size,
+ batch_count,
+ revcount = 0,
+ infos = [],
+ seq = 0,
+ localid,
+ source,
+ target,
+ filter,
+ db,
+ history = {[]}
+}).
+
+
+go(Source, Target) ->
+ go(Source, Target, []).
+
+
+go(DbName, Node, Opts) when is_binary(DbName), is_atom(Node) ->
+ go(#shard{name=DbName, node=node()}, #shard{name=DbName, node=Node}, Opts);
+
+
+go(#shard{} = Source, #shard{} = Target, Opts) ->
+ mem3_sync_security:maybe_sync(Source, Target),
+ BatchSize = case proplists:get_value(batch_size, Opts) of
+ BS when is_integer(BS), BS > 0 -> BS;
+ _ -> 100
+ end,
+ BatchCount = case proplists:get_value(batch_count, Opts) of
+ all -> all;
+ BC when is_integer(BC), BC > 0 -> BC;
+ _ -> 1
+ end,
+ Filter = proplists:get_value(filter, Opts),
+ Acc = #acc{
+ batch_size = BatchSize,
+ batch_count = BatchCount,
+ source = Source,
+ target = Target,
+ filter = Filter
+ },
+ go(Acc).
+
+
+go(#acc{source=Source, batch_count=BC}=Acc0) ->
+ case couch_db:open(Source#shard.name, [?ADMIN_CTX]) of
+ {ok, Db} ->
+ Acc = Acc0#acc{db=Db},
+ Resp = try
+ repl(Db, Acc)
+ catch error:{not_found, no_db_file} ->
+ {error, missing_target}
+ after
+ couch_db:close(Db)
+ end,
+ case Resp of
+ {ok, P} when P > 0, BC == all ->
+ go(Acc);
+ {ok, P} when P > 0, BC > 1 ->
+ go(Acc#acc{batch_count=BC-1});
+ Else ->
+ Else
+ end;
+ {not_found, no_db_file} ->
+ {error, missing_source}
+ end.
+
+
+make_local_id(Source, Target) ->
+ make_local_id(Source, Target, undefined).
+
+
+make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter) ->
+ make_local_id(SourceNode, TargetNode, Filter);
+
+
+make_local_id(SourceThing, TargetThing, Filter) ->
+ S = couch_util:encodeBase64Url(couch_crypto:hash(md5, term_to_binary(SourceThing))),
+ T = couch_util:encodeBase64Url(couch_crypto:hash(md5, term_to_binary(TargetThing))),
+ F = case is_function(Filter) of
+ true ->
+ {new_uniq, Hash} = erlang:fun_info(Filter, new_uniq),
+ B = couch_util:encodeBase64Url(Hash),
+ <<"-", B/binary>>;
+ false ->
+ <<>>
+ end,
+ <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>.
+
+
+%% @doc Find and return the largest update_seq in SourceDb
+%% that the client has seen from TargetNode.
+%%
+%% When reasoning about this function it is very important to
+%% understand the direction of replication for this comparison.
+%% We're only interesting in internal replications initiated
+%% by this node to the node being replaced. When doing a
+%% replacement the most important thing is that the client doesn't
+%% miss any updates. This means we can only fast-forward as far
+%% as they've seen updates on this node. We can detect that by
+%% looking for our push replication history and choosing the
+%% largest source_seq that has a target_seq =< TgtSeq.
+find_source_seq(SrcDb, TgtNode, TgtUUIDPrefix, TgtSeq) ->
+ case find_repl_doc(SrcDb, TgtUUIDPrefix) of
+ {ok, TgtUUID, Doc} ->
+ SrcNode = atom_to_binary(node(), utf8),
+ find_source_seq_int(Doc, SrcNode, TgtNode, TgtUUID, TgtSeq);
+ {not_found, _} ->
+ 0
+ end.
+
+
+find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) ->
+ SrcNode = case is_atom(SrcNode0) of
+ true -> atom_to_binary(SrcNode0, utf8);
+ false -> SrcNode0
+ end,
+ TgtNode = case is_atom(TgtNode0) of
+ true -> atom_to_binary(TgtNode0, utf8);
+ false -> TgtNode0
+ end,
+ % This is split off purely for the ability to run unit tests
+ % against this bit of code without requiring all sorts of mocks.
+ {History} = couch_util:get_value(<<"history">>, Props, {[]}),
+ SrcHistory = couch_util:get_value(SrcNode, History, []),
+ UseableHistory = lists:filter(fun({Entry}) ->
+ couch_util:get_value(<<"target_node">>, Entry) =:= TgtNode andalso
+ couch_util:get_value(<<"target_uuid">>, Entry) =:= TgtUUID andalso
+ couch_util:get_value(<<"target_seq">>, Entry) =< TgtSeq
+ end, SrcHistory),
+
+ % This relies on SrcHistory being ordered desceding by source
+ % sequence.
+ case UseableHistory of
+ [{Entry} | _] ->
+ couch_util:get_value(<<"source_seq">>, Entry);
+ [] ->
+ 0
+ end.
+
+
+repl(#db{name=DbName, seq_tree=Bt}=Db, Acc0) ->
+ erlang:put(io_priority, {internal_repl, DbName}),
+ #acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0#acc{source = Db}),
+ Fun = fun ?MODULE:changes_enumerator/3,
+ {ok, _, Acc2} = couch_btree:fold(Bt, Fun, Acc1, [{start_key, Seq + 1}]),
+ {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2),
+ {ok, couch_db:count_changes_since(Db, LastSeq)}.
+
+
+calculate_start_seq(Acc) ->
+ #acc{
+ source = Db,
+ target = #shard{node=Node, name=Name}
+ } = Acc,
+ %% Give the target our UUID and ask it to return the checkpoint doc
+ UUID = couch_db:get_uuid(Db),
+ {NewDocId, Doc} = mem3_rpc:load_checkpoint(Node, Name, node(), UUID),
+ #doc{id=FoundId, body={TProps}} = Doc,
+ Acc1 = Acc#acc{localid = NewDocId},
+ % NewDocId and FoundId may be different the first time
+ % this code runs to save our newly named internal replication
+ % checkpoints. We store NewDocId to use when saving checkpoints
+ % but use FoundId to reuse the same docid that the target used.
+ case couch_db:open_doc(Db, FoundId, [ejson_body]) of
+ {ok, #doc{body = {SProps}}} ->
+ SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0),
+ TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0),
+ % We resume from the lower update seq stored in the two
+ % shard copies. We also need to be sure and use the
+ % corresponding history. A difference here could result
+ % from either a write failure on one of the nodes or if
+ % either shard was truncated by an operator.
+ case SourceSeq =< TargetSeq of
+ true ->
+ Seq = SourceSeq,
+ History = couch_util:get_value(<<"history">>, SProps, {[]});
+ false ->
+ Seq = TargetSeq,
+ History = couch_util:get_value(<<"history">>, TProps, {[]})
+ end,
+ Acc1#acc{seq = Seq, history = History};
+ {not_found, _} ->
+ compare_epochs(Acc1)
+ end.
+
+compare_epochs(Acc) ->
+ #acc{
+ source = Db,
+ target = #shard{node=Node, name=Name}
+ } = Acc,
+ UUID = couch_db:get_uuid(Db),
+ Epochs = couch_db:get_epochs(Db),
+ Seq = mem3_rpc:find_common_seq(Node, Name, UUID, Epochs),
+ Acc#acc{seq = Seq, history = {[]}}.
+
+changes_enumerator(#doc_info{id=DocId}, Reds, #acc{db=Db}=Acc) ->
+ {ok, FDI} = couch_db:get_full_doc_info(Db, DocId),
+ changes_enumerator(FDI, Reds, Acc);
+changes_enumerator(#full_doc_info{}=FDI, _,
+ #acc{revcount=C, infos=Infos}=Acc0) ->
+ #doc_info{
+ high_seq=Seq,
+ revs=Revs
+ } = couch_doc:to_doc_info(FDI),
+ {Count, NewInfos} = case filter_doc(Acc0#acc.filter, FDI) of
+ keep -> {C + length(Revs), [FDI | Infos]};
+ discard -> {C, Infos}
+ end,
+ Acc1 = Acc0#acc{
+ seq=Seq,
+ revcount=Count,
+ infos=NewInfos
+ },
+ Go = if Count < Acc1#acc.batch_size -> ok; true -> stop end,
+ {Go, Acc1}.
+
+
+replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) ->
+ case find_missing_revs(Acc) of
+ [] ->
+ ok;
+ Missing ->
+ lists:map(fun(Chunk) ->
+ Docs = open_docs(Acc, Chunk),
+ ok = save_on_target(Node, Name, Docs)
+ end, chunk_revs(Missing))
+ end,
+ update_locals(Acc),
+ {ok, Acc#acc{revcount=0, infos=[]}}.
+
+
+find_missing_revs(Acc) ->
+ #acc{target = #shard{node=Node, name=Name}, infos = Infos} = Acc,
+ IdsRevs = lists:map(fun(FDI) ->
+ #doc_info{id=Id, revs=RevInfos} = couch_doc:to_doc_info(FDI),
+ {Id, [R || #rev_info{rev=R} <- RevInfos]}
+ end, Infos),
+ mem3_rpc:get_missing_revs(Node, Name, IdsRevs, [
+ {io_priority, {internal_repl, Name}},
+ ?ADMIN_CTX
+ ]).
+
+
+chunk_revs(Revs) ->
+ Limit = list_to_integer(config:get("mem3", "rev_chunk_size", "5000")),
+ chunk_revs(Revs, Limit).
+
+chunk_revs(Revs, Limit) ->
+ chunk_revs(Revs, {0, []}, [], Limit).
+
+chunk_revs([], {_Count, Chunk}, Chunks, _Limit) ->
+ [Chunk|Chunks];
+chunk_revs([{Id, R, A}|Revs], {Count, Chunk}, Chunks, Limit) when length(R) =< Limit - Count ->
+ chunk_revs(
+ Revs,
+ {Count + length(R), [{Id, R, A}|Chunk]},
+ Chunks,
+ Limit
+ );
+chunk_revs([{Id, R, A}|Revs], {Count, Chunk}, Chunks, Limit) ->
+ {This, Next} = lists:split(Limit - Count, R),
+ chunk_revs(
+ [{Id, Next, A}|Revs],
+ {0, []},
+ [[{Id, This, A}|Chunk]|Chunks],
+ Limit
+ ).
+
+
+open_docs(#acc{source=Source, infos=Infos}, Missing) ->
+ lists:flatmap(fun({Id, Revs, _}) ->
+ FDI = lists:keyfind(Id, #full_doc_info.id, Infos),
+ #full_doc_info{rev_tree=RevTree} = FDI,
+ {FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs),
+ lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) ->
+ couch_db:make_doc(Source, Id, IsDel, SummaryPtr, FoundRevPath)
+ end, FoundRevs)
+ end, Missing).
+
+
+save_on_target(Node, Name, Docs) ->
+ mem3_rpc:update_docs(Node, Name, Docs, [
+ replicated_changes,
+ full_commit,
+ ?ADMIN_CTX,
+ {io_priority, {internal_repl, Name}}
+ ]),
+ ok.
+
+
+update_locals(Acc) ->
+ #acc{seq=Seq, source=Db, target=Target, localid=Id, history=History} = Acc,
+ #shard{name=Name, node=Node} = Target,
+ NewEntry = [
+ {<<"source_node">>, atom_to_binary(node(), utf8)},
+ {<<"source_uuid">>, couch_db:get_uuid(Db)},
+ {<<"source_seq">>, Seq},
+ {<<"timestamp">>, list_to_binary(iso8601_timestamp())}
+ ],
+ NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, NewEntry, History),
+ {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []).
+
+
+find_repl_doc(SrcDb, TgtUUIDPrefix) ->
+ SrcUUID = couch_db:get_uuid(SrcDb),
+ S = couch_util:encodeBase64Url(couch_crypto:hash(md5, term_to_binary(SrcUUID))),
+ DocIdPrefix = <<"_local/shard-sync-", S/binary, "-">>,
+ FoldFun = fun({DocId, {Rev0, {BodyProps}}}, _, _) ->
+ TgtUUID = couch_util:get_value(<<"target_uuid">>, BodyProps, <<>>),
+ case is_prefix(DocIdPrefix, DocId) of
+ true ->
+ case is_prefix(TgtUUIDPrefix, TgtUUID) of
+ true ->
+ Rev = list_to_binary(integer_to_list(Rev0)),
+ Doc = #doc{id=DocId, revs={0, [Rev]}, body={BodyProps}},
+ {stop, {TgtUUID, Doc}};
+ false ->
+ {ok, not_found}
+ end;
+ _ ->
+ {stop, not_found}
+ end
+ end,
+ Options = [{start_key, DocIdPrefix}],
+ case couch_btree:fold(SrcDb#db.local_tree, FoldFun, not_found, Options) of
+ {ok, _, {TgtUUID, Doc}} ->
+ {ok, TgtUUID, Doc};
+ {ok, _, not_found} ->
+ {not_found, missing};
+ Else ->
+ couch_log:error("Error finding replication doc: ~w", [Else]),
+ {not_found, missing}
+ end.
+
+
+is_prefix(Prefix, Subject) ->
+ binary:longest_common_prefix([Prefix, Subject]) == size(Prefix).
+
+
+filter_doc(Filter, FullDocInfo) when is_function(Filter) ->
+ try Filter(FullDocInfo) of
+ discard -> discard;
+ _ -> keep
+ catch _:_ ->
+ keep
+ end;
+filter_doc(_, _) ->
+ keep.
+
+
+iso8601_timestamp() ->
+ {_,_,Micro} = Now = os:timestamp(),
+ {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now),
+ Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ",
+ io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]).
+
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+
+find_source_seq_unknown_node_test() ->
+ ?assertEqual(
+ find_source_seq_int(doc_(), <<"foo">>, <<"bing">>, <<"bar_uuid">>, 10),
+ 0
+ ).
+
+
+find_source_seq_unknown_uuid_test() ->
+ ?assertEqual(
+ find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"teapot">>, 10),
+ 0
+ ).
+
+
+find_source_seq_ok_test() ->
+ ?assertEqual(
+ find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"bar_uuid">>, 100),
+ 100
+ ).
+
+
+find_source_seq_old_ok_test() ->
+ ?assertEqual(
+ find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"bar_uuid">>, 84),
+ 50
+ ).
+
+
+find_source_seq_different_node_test() ->
+ ?assertEqual(
+ find_source_seq_int(doc_(), <<"foo2">>, <<"bar">>, <<"bar_uuid">>, 92),
+ 31
+ ).
+
+
+-define(SNODE, <<"source_node">>).
+-define(SUUID, <<"source_uuid">>).
+-define(SSEQ, <<"source_seq">>).
+-define(TNODE, <<"target_node">>).
+-define(TUUID, <<"target_uuid">>).
+-define(TSEQ, <<"target_seq">>).
+
+doc_() ->
+ Foo_Bar = [
+ {[
+ {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 100},
+ {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 100}
+ ]},
+ {[
+ {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 90},
+ {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 85}
+ ]},
+ {[
+ {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 50},
+ {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 51}
+ ]},
+ {[
+ {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 40},
+ {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 45}
+ ]},
+ {[
+ {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 2},
+ {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 2}
+ ]}
+ ],
+ Foo2_Bar = [
+ {[
+ {?SNODE, <<"foo2">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 100},
+ {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 100}
+ ]},
+ {[
+ {?SNODE, <<"foo2">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 92},
+ {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 93}
+ ]},
+ {[
+ {?SNODE, <<"foo2">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 31},
+ {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 30}
+ ]}
+ ],
+ History = {[
+ {<<"foo">>, Foo_Bar},
+ {<<"foo2">>, Foo2_Bar}
+ ]},
+ #doc{
+ body={[{<<"history">>, History}]}
+ }.
+
+-endif.
diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl
new file mode 100644
index 000000000..93cb99ac9
--- /dev/null
+++ b/src/mem3/src/mem3_rpc.erl
@@ -0,0 +1,586 @@
+% Copyright 2013 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_rpc).
+
+
+-export([
+ find_common_seq/4,
+ get_missing_revs/4,
+ update_docs/4,
+ load_checkpoint/4,
+ save_checkpoint/6
+]).
+
+% Private RPC callbacks
+-export([
+ find_common_seq_rpc/3,
+ load_checkpoint_rpc/3,
+ save_checkpoint_rpc/5
+]).
+
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+get_missing_revs(Node, DbName, IdsRevs, Options) ->
+ rexi_call(Node, {fabric_rpc, get_missing_revs, [DbName, IdsRevs, Options]}).
+
+
+update_docs(Node, DbName, Docs, Options) ->
+ rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}).
+
+
+load_checkpoint(Node, DbName, SourceNode, SourceUUID) ->
+ Args = [DbName, SourceNode, SourceUUID],
+ rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}).
+
+
+save_checkpoint(Node, DbName, DocId, Seq, Entry, History) ->
+ Args = [DbName, DocId, Seq, Entry, History],
+ rexi_call(Node, {mem3_rpc, save_checkpoint_rpc, Args}).
+
+
+find_common_seq(Node, DbName, SourceUUID, SourceEpochs) ->
+ Args = [DbName, SourceUUID, SourceEpochs],
+ rexi_call(Node, {mem3_rpc, find_common_seq_rpc, Args}).
+
+
+load_checkpoint_rpc(DbName, SourceNode, SourceUUID) ->
+ erlang:put(io_priority, {internal_repl, DbName}),
+ case get_or_create_db(DbName, [?ADMIN_CTX]) of
+ {ok, Db} ->
+ TargetUUID = couch_db:get_uuid(Db),
+ NewId = mem3_rep:make_local_id(SourceUUID, TargetUUID),
+ case couch_db:open_doc(Db, NewId, []) of
+ {ok, Doc} ->
+ rexi:reply({ok, {NewId, Doc}});
+ {not_found, _} ->
+ OldId = mem3_rep:make_local_id(SourceNode, node()),
+ case couch_db:open_doc(Db, OldId, []) of
+ {ok, Doc} ->
+ rexi:reply({ok, {NewId, Doc}});
+ {not_found, _} ->
+ rexi:reply({ok, {NewId, #doc{id = NewId}}})
+ end
+ end;
+ Error ->
+ rexi:reply(Error)
+ end.
+
+
+save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) ->
+ erlang:put(io_priority, {internal_repl, DbName}),
+ case get_or_create_db(DbName, [?ADMIN_CTX]) of
+ {ok, #db{update_seq = TargetSeq} = Db} ->
+ NewEntry = {[
+ {<<"target_node">>, atom_to_binary(node(), utf8)},
+ {<<"target_uuid">>, couch_db:get_uuid(Db)},
+ {<<"target_seq">>, TargetSeq}
+ ] ++ NewEntry0},
+ Body = {[
+ {<<"seq">>, SourceSeq},
+ {<<"target_uuid">>, couch_db:get_uuid(Db)},
+ {<<"history">>, add_checkpoint(NewEntry, History0)}
+ ]},
+ Doc = #doc{id = Id, body = Body},
+ rexi:reply(try couch_db:update_doc(Db, Doc, []) of
+ {ok, _} ->
+ {ok, Body};
+ Else ->
+ {error, Else}
+ catch
+ Exception ->
+ Exception;
+ error:Reason ->
+ {error, Reason}
+ end);
+ Error ->
+ rexi:reply(Error)
+ end.
+
+find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) ->
+ erlang:put(io_priority, {internal_repl, DbName}),
+ case get_or_create_db(DbName, [?ADMIN_CTX]) of
+ {ok, Db} ->
+ case couch_db:get_uuid(Db) of
+ SourceUUID ->
+ TargetEpochs = couch_db:get_epochs(Db),
+ Seq = compare_epochs(SourceEpochs, TargetEpochs),
+ rexi:reply({ok, Seq});
+ _Else ->
+ rexi:reply({ok, 0})
+ end;
+ Error ->
+ rexi:reply(Error)
+ end.
+
+
+%% @doc Return the sequence where two files with the same UUID diverged.
+compare_epochs(SourceEpochs, TargetEpochs) ->
+ compare_rev_epochs(
+ lists:reverse(SourceEpochs),
+ lists:reverse(TargetEpochs)
+ ).
+
+
+compare_rev_epochs([{Node, Seq} | SourceRest], [{Node, Seq} | TargetRest]) ->
+ % Common history, fast-forward
+ compare_epochs(SourceRest, TargetRest);
+compare_rev_epochs([], [{_, TargetSeq} | _]) ->
+ % Source has not moved, start from seq just before the target took over
+ TargetSeq - 1;
+compare_rev_epochs([{_, SourceSeq} | _], []) ->
+ % Target has not moved, start from seq where source diverged
+ SourceSeq;
+compare_rev_epochs([{_, SourceSeq} | _], [{_, TargetSeq} | _]) ->
+ % The source was moved to a new location independently, take the minimum
+ erlang:min(SourceSeq, TargetSeq) - 1.
+
+
+%% @doc This adds a new update sequence checkpoint to the replication
+%% history. Checkpoints are keyed by the source node so that we
+%% aren't mixing history between source shard moves.
+add_checkpoint({Props}, {History}) ->
+ % Extract the source and target seqs for reference
+ SourceSeq = couch_util:get_value(<<"source_seq">>, Props),
+ TargetSeq = couch_util:get_value(<<"target_seq">>, Props),
+
+ % Get the history relevant to the source node.
+ SourceNode = couch_util:get_value(<<"source_node">>, Props),
+ SourceHistory = couch_util:get_value(SourceNode, History, []),
+
+ % If either the source or target shard has been truncated
+ % we need to filter out any history that was stored for
+ % any larger update seq than we're currently recording.
+ FilteredHistory = filter_history(SourceSeq, TargetSeq, SourceHistory),
+
+ % Re-bucket our history based on the most recent source
+ % sequence. This is where we drop old checkpoints to
+ % maintain the exponential distribution.
+ {_, RebucketedHistory} = rebucket(FilteredHistory, SourceSeq, 0),
+ NewSourceHistory = [{Props} | RebucketedHistory],
+
+ % Finally update the source node history and we're done.
+ NodeRemoved = lists:keydelete(SourceNode, 1, History),
+ {[{SourceNode, NewSourceHistory} | NodeRemoved]}.
+
+
+filter_history(SourceSeqThresh, TargetSeqThresh, History) ->
+ SourceFilter = fun({Entry}) ->
+ SourceSeq = couch_util:get_value(<<"source_seq">>, Entry),
+ SourceSeq < SourceSeqThresh
+ end,
+ TargetFilter = fun({Entry}) ->
+ TargetSeq = couch_util:get_value(<<"target_seq">>, Entry),
+ TargetSeq < TargetSeqThresh
+ end,
+ SourceFiltered = lists:filter(SourceFilter, History),
+ lists:filter(TargetFilter, SourceFiltered).
+
+
+%% @doc This function adjusts our history to maintain a
+%% history of checkpoints that follow an exponentially
+%% increasing age from the most recent checkpoint.
+%%
+%% The terms newest and oldest used in these comments
+%% refers to the (NewSeq - CurSeq) difference where smaller
+%% values are considered newer.
+%%
+%% It works by assigning each entry to a bucket and keeping
+%% the newest and oldest entry in each bucket. Keeping
+%% both the newest and oldest means that we won't end up
+%% with empty buckets as checkpoints are promoted to new
+%% buckets.
+%%
+%% The return value of this function is a two-tuple of the
+%% form `{BucketId, History}` where BucketId is the id of
+%% the bucket for the first entry in History. This is used
+%% when recursing to detect the oldest value in a given
+%% bucket.
+%%
+%% This function expects the provided history to be sorted
+%% in descending order of source_seq values.
+rebucket([], _NewSeq, Bucket) ->
+ {Bucket+1, []};
+rebucket([{Entry} | RestHistory], NewSeq, Bucket) ->
+ CurSeq = couch_util:get_value(<<"source_seq">>, Entry),
+ case find_bucket(NewSeq, CurSeq, Bucket) of
+ Bucket ->
+ % This entry is in an existing bucket which means
+ % we will only keep it if its the oldest value
+ % in the bucket. To detect this we rebucket the
+ % rest of the list and only include Entry if the
+ % rest of the list is in a bigger bucket.
+ case rebucket(RestHistory, NewSeq, Bucket) of
+ {Bucket, NewHistory} ->
+ % There's another entry in this bucket so we drop the
+ % current entry.
+ {Bucket, NewHistory};
+ {NextBucket, NewHistory} when NextBucket > Bucket ->
+ % The rest of the history was rebucketed into a larger
+ % bucket so this is the oldest entry in the current
+ % bucket.
+ {Bucket, [{Entry} | NewHistory]}
+ end;
+ NextBucket when NextBucket > Bucket ->
+ % This entry is the newest in NextBucket so we add it
+ % to our history and continue rebucketing.
+ {_, NewHistory} = rebucket(RestHistory, NewSeq, NextBucket),
+ {NextBucket, [{Entry} | NewHistory]}
+ end.
+
+
+%% @doc Find the bucket id for the given sequence pair.
+find_bucket(NewSeq, CurSeq, Bucket) ->
+ % The +1 constant in this comparison is a bit subtle. The
+ % reason for it is to make sure that the first entry in
+ % the history is guaranteed to have a BucketId of 1. This
+ % also relies on never having a duplicated update
+ % sequence so adding 1 here guarantees a difference >= 2.
+ if (NewSeq - CurSeq + 1) > (2 bsl Bucket) ->
+ find_bucket(NewSeq, CurSeq, Bucket+1);
+ true ->
+ Bucket
+ end.
+
+
+rexi_call(Node, MFA) ->
+ Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
+ Ref = rexi:cast(Node, self(), MFA, [sync]),
+ try
+ receive {Ref, {ok, Reply}} ->
+ Reply;
+ {Ref, Error} ->
+ erlang:error(Error);
+ {rexi_DOWN, Mon, _, Reason} ->
+ erlang:error({rexi_DOWN, {Node, Reason}})
+ after 600000 ->
+ erlang:error(timeout)
+ end
+ after
+ rexi_monitor:stop(Mon)
+ end.
+
+
+get_or_create_db(DbName, Options) ->
+ couch_db:open_int(DbName, [{create_if_missing, true} | Options]).
+
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+
+-define(SNODE, <<"src@localhost">>).
+-define(TNODE, <<"tgt@localhost">>).
+-define(SNODE_KV, {<<"source_node">>, ?SNODE}).
+-define(TNODE_KV, {<<"target_node">>, ?TNODE}).
+-define(SSEQ, <<"source_seq">>).
+-define(TSEQ, <<"target_seq">>).
+-define(ENTRY(S, T), {[?SNODE_KV, {?SSEQ, S}, ?TNODE_KV, {?TSEQ, T}]}).
+
+
+filter_history_data() ->
+ [
+ ?ENTRY(13, 15),
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ].
+
+
+filter_history_remove_none_test() ->
+ ?assertEqual(filter_history(20, 20, filter_history_data()), [
+ ?ENTRY(13, 15),
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]).
+
+
+filter_history_remove_all_test() ->
+ ?assertEqual(filter_history(1, 1, filter_history_data()), []).
+
+
+filter_history_remove_equal_test() ->
+ ?assertEqual(filter_history(10, 10, filter_history_data()), [
+ ?ENTRY(2, 3)
+ ]),
+ ?assertEqual(filter_history(11, 9, filter_history_data()), [
+ ?ENTRY(2, 3)
+ ]).
+
+
+filter_history_remove_for_source_and_target_test() ->
+ ?assertEqual(filter_history(11, 20, filter_history_data()), [
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]),
+ ?assertEqual(filter_history(14, 14, filter_history_data()), [
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]).
+
+
+filter_history_remove_for_both_test() ->
+ ?assertEqual(filter_history(11, 11, filter_history_data()), [
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]).
+
+
+filter_history_remove_for_both_again_test() ->
+ ?assertEqual(filter_history(3, 4, filter_history_data()), [
+ ?ENTRY(2, 3)
+ ]).
+
+
+add_first_checkpoint_test() ->
+ History = {[]},
+ ?assertEqual(add_checkpoint(?ENTRY(2, 3), History), {[
+ {?SNODE, [
+ ?ENTRY(2, 3)
+ ]}
+ ]}).
+
+
+add_first_checkpoint_to_empty_test() ->
+ History = {[{?SNODE, []}]},
+ ?assertEqual(add_checkpoint(?ENTRY(2, 3), History), {[
+ {?SNODE, [
+ ?ENTRY(2, 3)
+ ]}
+ ]}).
+
+
+add_second_checkpoint_test() ->
+ History = {[{?SNODE, [?ENTRY(2, 3)]}]},
+ ?assertEqual(add_checkpoint(?ENTRY(10, 9), History), {[
+ {?SNODE, [
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}
+ ]}).
+
+
+add_third_checkpoint_test() ->
+ History = {[{?SNODE, [
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}]},
+ ?assertEqual(add_checkpoint(?ENTRY(11, 10), History), {[
+ {?SNODE, [
+ ?ENTRY(11, 10),
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}
+ ]}).
+
+
+add_fourth_checkpoint_test() ->
+ History = {[{?SNODE, [
+ ?ENTRY(11, 10),
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}]},
+ ?assertEqual(add_checkpoint(?ENTRY(12, 13), History), {[
+ {?SNODE, [
+ ?ENTRY(12, 13),
+ ?ENTRY(11, 10),
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}
+ ]}).
+
+
+add_checkpoint_with_replacement_test() ->
+ History = {[{?SNODE, [
+ ?ENTRY(12, 13),
+ ?ENTRY(11, 10),
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}]},
+ % Picking a source_seq of 16 to force 10, 11, and 12
+ % into the same bucket to show we drop the 11 entry.
+ ?assertEqual(add_checkpoint(?ENTRY(16, 16), History), {[
+ {?SNODE, [
+ ?ENTRY(16, 16),
+ ?ENTRY(12, 13),
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}
+ ]}).
+
+add_checkpoint_drops_redundant_checkpoints_test() ->
+ % I've added comments showing the bucket ID based
+ % on the ?ENTRY passed to add_checkpoint
+ History = {[{?SNODE, [
+ ?ENTRY(15, 15), % Bucket 0
+ ?ENTRY(14, 14), % Bucket 1
+ ?ENTRY(13, 13), % Bucket 1
+ ?ENTRY(12, 12), % Bucket 2
+ ?ENTRY(11, 11), % Bucket 2
+ ?ENTRY(10, 10), % Bucket 2
+ ?ENTRY(9, 9), % Bucket 2
+ ?ENTRY(8, 8), % Bucket 3
+ ?ENTRY(7, 7), % Bucket 3
+ ?ENTRY(6, 6), % Bucket 3
+ ?ENTRY(5, 5), % Bucket 3
+ ?ENTRY(4, 4), % Bucket 3
+ ?ENTRY(3, 3), % Bucket 3
+ ?ENTRY(2, 2), % Bucket 3
+ ?ENTRY(1, 1) % Bucket 3
+ ]}]},
+ ?assertEqual(add_checkpoint(?ENTRY(16, 16), History), {[
+ {?SNODE, [
+ ?ENTRY(16, 16), % Bucket 0
+ ?ENTRY(15, 15), % Bucket 0
+ ?ENTRY(14, 14), % Bucket 1
+ ?ENTRY(13, 13), % Bucket 1
+ ?ENTRY(12, 12), % Bucket 2
+ ?ENTRY(9, 9), % Bucket 2
+ ?ENTRY(8, 8), % Bucket 3
+ ?ENTRY(1, 1) % Bucket 3
+ ]}
+ ]}).
+
+
+add_checkpoint_show_not_always_a_drop_test() ->
+ % Depending on the edge conditions of buckets we
+ % may not always drop values when adding new
+ % checkpoints. In this case 12 stays because there's
+ % no longer a value for 10 or 11.
+ %
+ % I've added comments showing the bucket ID based
+ % on the ?ENTRY passed to add_checkpoint
+ History = {[{?SNODE, [
+ ?ENTRY(16, 16), % Bucket 0
+ ?ENTRY(15, 15), % Bucket 1
+ ?ENTRY(14, 14), % Bucket 1
+ ?ENTRY(13, 13), % Bucket 2
+ ?ENTRY(12, 12), % Bucket 2
+ ?ENTRY(9, 9), % Bucket 3
+ ?ENTRY(8, 8), % Bucket 3
+ ?ENTRY(1, 1) % Bucket 4
+ ]}]},
+ ?assertEqual(add_checkpoint(?ENTRY(17, 17), History), {[
+ {?SNODE, [
+ ?ENTRY(17, 17), % Bucket 0
+ ?ENTRY(16, 16), % Bucket 0
+ ?ENTRY(15, 15), % Bucket 1
+ ?ENTRY(14, 14), % Bucket 1
+ ?ENTRY(13, 13), % Bucket 2
+ ?ENTRY(12, 12), % Bucket 2
+ ?ENTRY(9, 9), % Bucket 3
+ ?ENTRY(8, 8), % Bucket 3
+ ?ENTRY(1, 1) % Bucket 4
+ ]}
+ ]}).
+
+
+add_checkpoint_big_jump_show_lots_drop_test() ->
+ % I've added comments showing the bucket ID based
+ % on the ?ENTRY passed to add_checkpoint
+ History = {[{?SNODE, [
+ ?ENTRY(16, 16), % Bucket 4
+ ?ENTRY(15, 15), % Bucket 4
+ ?ENTRY(14, 14), % Bucket 4
+ ?ENTRY(13, 13), % Bucket 4
+ ?ENTRY(12, 12), % Bucket 4
+ ?ENTRY(9, 9), % Bucket 4
+ ?ENTRY(8, 8), % Bucket 4
+ ?ENTRY(1, 1) % Bucket 4
+ ]}]},
+ ?assertEqual(add_checkpoint(?ENTRY(32, 32), History), {[
+ {?SNODE, [
+ ?ENTRY(32, 32), % Bucket 0
+ ?ENTRY(16, 16), % Bucket 4
+ ?ENTRY(1, 1) % Bucket 4
+ ]}
+ ]}).
+
+
+add_checkpoint_show_filter_history_test() ->
+ History = {[{?SNODE, [
+ ?ENTRY(16, 16),
+ ?ENTRY(15, 15),
+ ?ENTRY(14, 14),
+ ?ENTRY(13, 13),
+ ?ENTRY(12, 12),
+ ?ENTRY(9, 9),
+ ?ENTRY(8, 8),
+ ?ENTRY(1, 1)
+ ]}]},
+ % Drop for both
+ ?assertEqual(add_checkpoint(?ENTRY(10, 10), History), {[
+ {?SNODE, [
+ ?ENTRY(10, 10),
+ ?ENTRY(9, 9),
+ ?ENTRY(8, 8),
+ ?ENTRY(1, 1)
+ ]}
+ ]}),
+ % Drop four source
+ ?assertEqual(add_checkpoint(?ENTRY(10, 200), History), {[
+ {?SNODE, [
+ ?ENTRY(10, 200),
+ ?ENTRY(9, 9),
+ ?ENTRY(8, 8),
+ ?ENTRY(1, 1)
+ ]}
+ ]}),
+ % Drop for target. Obviously a source_seq of 200
+ % will end up droping the 8 entry.
+ ?assertEqual(add_checkpoint(?ENTRY(200, 10), History), {[
+ {?SNODE, [
+ ?ENTRY(200, 10),
+ ?ENTRY(9, 9),
+ ?ENTRY(1, 1)
+ ]}
+ ]}).
+
+
+add_checkpoint_from_other_node_test() ->
+ History = {[{<<"not_the_source">>, [
+ ?ENTRY(12, 13),
+ ?ENTRY(11, 10),
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}]},
+ % No filtering
+ ?assertEqual(add_checkpoint(?ENTRY(1, 1), History), {[
+ {?SNODE, [
+ ?ENTRY(1, 1)
+ ]},
+ {<<"not_the_source">>, [
+ ?ENTRY(12, 13),
+ ?ENTRY(11, 10),
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}
+ ]}),
+ % No dropping
+ ?assertEqual(add_checkpoint(?ENTRY(200, 200), History), {[
+ {?SNODE, [
+ ?ENTRY(200, 200)
+ ]},
+ {<<"not_the_source">>, [
+ ?ENTRY(12, 13),
+ ?ENTRY(11, 10),
+ ?ENTRY(10, 9),
+ ?ENTRY(2, 3)
+ ]}
+ ]}).
+
+
+-endif.
diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl
new file mode 100644
index 000000000..c7f33c61f
--- /dev/null
+++ b/src/mem3/src/mem3_shards.erl
@@ -0,0 +1,419 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_shards).
+-behaviour(gen_server).
+-vsn(3).
+-behaviour(config_listener).
+
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+-export([handle_config_change/5, handle_config_terminate/3]).
+
+-export([start_link/0]).
+-export([for_db/1, for_db/2, for_docid/2, for_docid/3, get/3, local/1, fold/2]).
+-export([for_shard_name/1]).
+-export([set_max_size/1]).
+
+-record(st, {
+ max_size = 25000,
+ cur_size = 0,
+ changes_pid
+}).
+
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-define(DBS, mem3_dbs).
+-define(SHARDS, mem3_shards).
+-define(ATIMES, mem3_atimes).
+-define(RELISTEN_DELAY, 5000).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+for_db(DbName) ->
+ for_db(DbName, []).
+
+for_db(DbName, Options) ->
+ Shards = try ets:lookup(?SHARDS, DbName) of
+ [] ->
+ load_shards_from_disk(DbName);
+ Else ->
+ gen_server:cast(?MODULE, {cache_hit, DbName}),
+ Else
+ catch error:badarg ->
+ load_shards_from_disk(DbName)
+ end,
+ case lists:member(ordered, Options) of
+ true -> Shards;
+ false -> mem3_util:downcast(Shards)
+ end.
+
+for_docid(DbName, DocId) ->
+ for_docid(DbName, DocId, []).
+
+for_docid(DbName, DocId, Options) ->
+ HashKey = mem3_util:hash(DocId),
+ ShardHead = #shard{
+ name = '_',
+ node = '_',
+ dbname = DbName,
+ range = ['$1','$2'],
+ ref = '_'
+ },
+ OrderedShardHead = #ordered_shard{
+ name = '_',
+ node = '_',
+ dbname = DbName,
+ range = ['$1','$2'],
+ ref = '_',
+ order = '_'
+ },
+ Conditions = [{'=<', '$1', HashKey}, {'=<', HashKey, '$2'}],
+ ShardSpec = {ShardHead, Conditions, ['$_']},
+ OrderedShardSpec = {OrderedShardHead, Conditions, ['$_']},
+ Shards = try ets:select(?SHARDS, [ShardSpec, OrderedShardSpec]) of
+ [] ->
+ load_shards_from_disk(DbName, DocId);
+ Else ->
+ gen_server:cast(?MODULE, {cache_hit, DbName}),
+ Else
+ catch error:badarg ->
+ load_shards_from_disk(DbName, DocId)
+ end,
+ case lists:member(ordered, Options) of
+ true -> Shards;
+ false -> mem3_util:downcast(Shards)
+ end.
+
+for_shard_name(ShardName) ->
+ for_shard_name(ShardName, []).
+
+for_shard_name(ShardName, Options) ->
+ DbName = mem3:dbname(ShardName),
+ ShardHead = #shard{
+ name = ShardName,
+ node = '_',
+ dbname = DbName,
+ range = '_',
+ ref = '_'
+ },
+ OrderedShardHead = #ordered_shard{
+ name = ShardName,
+ node = '_',
+ dbname = DbName,
+ range = '_',
+ ref = '_',
+ order = '_'
+ },
+ ShardSpec = {ShardHead, [], ['$_']},
+ OrderedShardSpec = {OrderedShardHead, [], ['$_']},
+ Shards = try ets:select(?SHARDS, [ShardSpec, OrderedShardSpec]) of
+ [] ->
+ filter_shards_by_name(ShardName, load_shards_from_disk(DbName));
+ Else ->
+ gen_server:cast(?MODULE, {cache_hit, DbName}),
+ Else
+ catch error:badarg ->
+ filter_shards_by_name(ShardName, load_shards_from_disk(DbName))
+ end,
+ case lists:member(ordered, Options) of
+ true -> Shards;
+ false -> mem3_util:downcast(Shards)
+ end.
+
+get(DbName, Node, Range) ->
+ Res = lists:foldl(fun(#shard{node=N, range=R}=S, Acc) ->
+ case {N, R} of
+ {Node, Range} -> [S | Acc];
+ _ -> Acc
+ end
+ end, [], for_db(DbName)),
+ case Res of
+ [] -> {error, not_found};
+ [Shard] -> {ok, Shard};
+ [_|_] -> {error, duplicates}
+ end.
+
+local(DbName) when is_list(DbName) ->
+ local(list_to_binary(DbName));
+local(DbName) ->
+ Pred = fun(#shard{node=Node}) when Node == node() -> true; (_) -> false end,
+ lists:filter(Pred, for_db(DbName)).
+
+fold(Fun, Acc) ->
+ DbName = config:get("mem3", "shards_db", "_dbs"),
+ {ok, Db} = mem3_util:ensure_exists(DbName),
+ FAcc = {Db, Fun, Acc},
+ try
+ {ok, _, LastAcc} = couch_db:enum_docs(Db, fun fold_fun/3, FAcc, []),
+ {_Db, _UFun, UAcc} = LastAcc,
+ UAcc
+ after
+ couch_db:close(Db)
+ end.
+
+set_max_size(Size) when is_integer(Size), Size > 0 ->
+ gen_server:call(?MODULE, {set_max_size, Size}).
+
+handle_config_change("mem3", "shard_cache_size", SizeList, _, _) ->
+ Size = list_to_integer(SizeList),
+ {ok, gen_server:call(?MODULE, {set_max_size, Size}, infinity)};
+handle_config_change("mem3", "shards_db", _DbName, _, _) ->
+ {ok, gen_server:call(?MODULE, shard_db_changed, infinity)};
+handle_config_change(_, _, _, _, _) ->
+ {ok, nil}.
+
+handle_config_terminate(_, stop, _) ->
+ ok;
+handle_config_terminate(_Server, _Reason, _State) ->
+ erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener).
+
+init([]) ->
+ ets:new(?SHARDS, [
+ bag,
+ protected,
+ named_table,
+ {keypos,#shard.dbname},
+ {read_concurrency, true}
+ ]),
+ ets:new(?DBS, [set, protected, named_table]),
+ ets:new(?ATIMES, [ordered_set, protected, named_table]),
+ ok = config:listen_for_changes(?MODULE, nil),
+ SizeList = config:get("mem3", "shard_cache_size", "25000"),
+ {Pid, _} = spawn_monitor(fun() -> listen_for_changes(get_update_seq()) end),
+ {ok, #st{
+ max_size = list_to_integer(SizeList),
+ cur_size = 0,
+ changes_pid = Pid
+ }}.
+
+handle_call({set_max_size, Size}, _From, St) ->
+ {reply, ok, cache_free(St#st{max_size=Size})};
+handle_call(shard_db_changed, _From, St) ->
+ exit(St#st.changes_pid, shard_db_changed),
+ {reply, ok, St};
+handle_call(_Call, _From, St) ->
+ {noreply, St}.
+
+handle_cast({cache_hit, DbName}, St) ->
+ couch_stats:increment_counter([mem3, shard_cache, hit]),
+ cache_hit(DbName),
+ {noreply, St};
+handle_cast({cache_insert, DbName, Shards}, St) ->
+ couch_stats:increment_counter([mem3, shard_cache, miss]),
+ {noreply, cache_free(cache_insert(St, DbName, Shards))};
+handle_cast({cache_remove, DbName}, St) ->
+ couch_stats:increment_counter([mem3, shard_cache, eviction]),
+ {noreply, cache_remove(St, DbName)};
+handle_cast(_Msg, St) ->
+ {noreply, St}.
+
+handle_info({'DOWN', _, _, Pid, Reason}, #st{changes_pid=Pid}=St) ->
+ {NewSt, Seq} = case Reason of
+ {seq, EndSeq} ->
+ {St, EndSeq};
+ shard_db_changed ->
+ {cache_clear(St), get_update_seq()};
+ _ ->
+ couch_log:notice("~p changes listener died ~p", [?MODULE, Reason]),
+ {St, get_update_seq()}
+ end,
+ erlang:send_after(5000, self(), {start_listener, Seq}),
+ {noreply, NewSt#st{changes_pid=undefined}};
+handle_info({start_listener, Seq}, St) ->
+ {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
+ {noreply, St#st{changes_pid=NewPid}};
+handle_info(restart_config_listener, State) ->
+ ok = config:listen_for_changes(?MODULE, nil),
+ {noreply, State};
+handle_info(_Msg, St) ->
+ {noreply, St}.
+
+terminate(_Reason, #st{changes_pid=Pid}) ->
+ exit(Pid, kill),
+ ok.
+
+code_change(_OldVsn, #st{}=St, _Extra) ->
+ {ok, St}.
+
+%% internal functions
+
+fold_fun(#full_doc_info{}=FDI, _, Acc) ->
+ DI = couch_doc:to_doc_info(FDI),
+ fold_fun(DI, nil, Acc);
+fold_fun(#doc_info{}=DI, _, {Db, UFun, UAcc}) ->
+ case couch_db:open_doc(Db, DI, [ejson_body, conflicts]) of
+ {ok, Doc} ->
+ {Props} = Doc#doc.body,
+ Shards = mem3_util:build_shards(Doc#doc.id, Props),
+ NewUAcc = lists:foldl(UFun, UAcc, Shards),
+ {ok, {Db, UFun, NewUAcc}};
+ _ ->
+ {ok, {Db, UFun, UAcc}}
+ end.
+
+get_update_seq() ->
+ DbName = config:get("mem3", "shards_db", "_dbs"),
+ {ok, Db} = mem3_util:ensure_exists(DbName),
+ couch_db:close(Db),
+ Db#db.update_seq.
+
+listen_for_changes(Since) ->
+ DbName = config:get("mem3", "shards_db", "_dbs"),
+ {ok, Db} = mem3_util:ensure_exists(DbName),
+ Args = #changes_args{
+ feed = "continuous",
+ since = Since,
+ heartbeat = true,
+ include_docs = true
+ },
+ ChangesFun = couch_changes:handle_db_changes(Args, Since, Db),
+ ChangesFun(fun changes_callback/2).
+
+changes_callback(start, Acc) ->
+ {ok, Acc};
+changes_callback({stop, EndSeq}, _) ->
+ exit({seq, EndSeq});
+changes_callback({change, {Change}, _}, _) ->
+ DbName = couch_util:get_value(<<"id">>, Change),
+ case DbName of <<"_design/", _/binary>> -> ok; _Else ->
+ case mem3_util:is_deleted(Change) of
+ true ->
+ gen_server:cast(?MODULE, {cache_remove, DbName});
+ false ->
+ case couch_util:get_value(doc, Change) of
+ {error, Reason} ->
+ couch_log:error("missing partition table for ~s: ~p",
+ [DbName, Reason]);
+ {Doc} ->
+ Shards = mem3_util:build_ordered_shards(DbName, Doc),
+ gen_server:cast(?MODULE, {cache_insert, DbName, Shards}),
+ [create_if_missing(mem3:name(S)) || S
+ <- Shards, mem3:node(S) =:= node()]
+ end
+ end
+ end,
+ {ok, couch_util:get_value(<<"seq">>, Change)};
+changes_callback(timeout, _) ->
+ ok.
+
+load_shards_from_disk(DbName) when is_binary(DbName) ->
+ X = ?l2b(config:get("mem3", "shards_db", "_dbs")),
+ {ok, Db} = mem3_util:ensure_exists(X),
+ try
+ load_shards_from_db(Db, DbName)
+ after
+ couch_db:close(Db)
+ end.
+
+load_shards_from_db(#db{} = ShardDb, DbName) ->
+ case couch_db:open_doc(ShardDb, DbName, [ejson_body]) of
+ {ok, #doc{body = {Props}}} ->
+ Shards = mem3_util:build_ordered_shards(DbName, Props),
+ gen_server:cast(?MODULE, {cache_insert, DbName, Shards}),
+ Shards;
+ {not_found, _} ->
+ erlang:error(database_does_not_exist, ?b2l(DbName))
+ end.
+
+load_shards_from_disk(DbName, DocId)->
+ Shards = load_shards_from_disk(DbName),
+ HashKey = mem3_util:hash(DocId),
+ [S || S <- Shards, in_range(S, HashKey)].
+
+in_range(Shard, HashKey) ->
+ [B, E] = mem3:range(Shard),
+ B =< HashKey andalso HashKey =< E.
+
+create_if_missing(Name) ->
+ DbDir = config:get("couchdb", "database_dir"),
+ Filename = filename:join(DbDir, ?b2l(Name) ++ ".couch"),
+ case filelib:is_regular(Filename) of
+ true ->
+ ok;
+ false ->
+ case couch_server:create(Name, [?ADMIN_CTX]) of
+ {ok, Db} ->
+ couch_db:close(Db);
+ Error ->
+ couch_log:error("~p tried to create ~s, got ~p",
+ [?MODULE, Name, Error])
+ end
+ end.
+
+cache_insert(#st{cur_size=Cur}=St, DbName, Shards) ->
+ NewATime = now(),
+ true = ets:delete(?SHARDS, DbName),
+ true = ets:insert(?SHARDS, Shards),
+ case ets:lookup(?DBS, DbName) of
+ [{DbName, ATime}] ->
+ true = ets:delete(?ATIMES, ATime),
+ true = ets:insert(?ATIMES, {NewATime, DbName}),
+ true = ets:insert(?DBS, {DbName, NewATime}),
+ St;
+ [] ->
+ true = ets:insert(?ATIMES, {NewATime, DbName}),
+ true = ets:insert(?DBS, {DbName, NewATime}),
+ St#st{cur_size=Cur + 1}
+ end.
+
+cache_remove(#st{cur_size=Cur}=St, DbName) ->
+ true = ets:delete(?SHARDS, DbName),
+ case ets:lookup(?DBS, DbName) of
+ [{DbName, ATime}] ->
+ true = ets:delete(?DBS, DbName),
+ true = ets:delete(?ATIMES, ATime),
+ St#st{cur_size=Cur-1};
+ [] ->
+ St
+ end.
+
+cache_hit(DbName) ->
+ case ets:lookup(?DBS, DbName) of
+ [{DbName, ATime}] ->
+ NewATime = now(),
+ true = ets:delete(?ATIMES, ATime),
+ true = ets:insert(?ATIMES, {NewATime, DbName}),
+ true = ets:insert(?DBS, {DbName, NewATime});
+ [] ->
+ ok
+ end.
+
+cache_free(#st{max_size=Max, cur_size=Cur}=St) when Max =< Cur ->
+ ATime = ets:first(?ATIMES),
+ [{ATime, DbName}] = ets:lookup(?ATIMES, ATime),
+ true = ets:delete(?ATIMES, ATime),
+ true = ets:delete(?DBS, DbName),
+ true = ets:delete(?SHARDS, DbName),
+ cache_free(St#st{cur_size=Cur-1});
+cache_free(St) ->
+ St.
+
+cache_clear(St) ->
+ true = ets:delete_all_objects(?DBS),
+ true = ets:delete_all_objects(?SHARDS),
+ true = ets:delete_all_objects(?ATIMES),
+ St#st{cur_size=0}.
+
+filter_shards_by_name(Name, Shards) ->
+ filter_shards_by_name(Name, [], Shards).
+
+filter_shards_by_name(_, Matches, []) ->
+ Matches;
+filter_shards_by_name(Name, Matches, [#ordered_shard{name=Name}=S|Ss]) ->
+ filter_shards_by_name(Name, [S|Matches], Ss);
+filter_shards_by_name(Name, Matches, [#shard{name=Name}=S|Ss]) ->
+ filter_shards_by_name(Name, [S|Matches], Ss);
+filter_shards_by_name(Name, Matches, [_|Ss]) ->
+ filter_shards_by_name(Name, Matches, Ss).
diff --git a/src/mem3/src/mem3_sup.erl b/src/mem3/src/mem3_sup.erl
new file mode 100644
index 000000000..80b8ca37f
--- /dev/null
+++ b/src/mem3/src/mem3_sup.erl
@@ -0,0 +1,35 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_sup).
+-behaviour(supervisor).
+-export([start_link/0, init/1]).
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init(_Args) ->
+ Children = [
+ child(mem3_events),
+ child(mem3_nodes),
+ child(mem3_sync_nodes), % Order important?
+ child(mem3_sync),
+ child(mem3_shards),
+ child(mem3_sync_event_listener)
+ ],
+ {ok, {{one_for_one,10,1}, couch_epi:register_service(mem3_epi, Children)}}.
+
+child(mem3_events) ->
+ MFA = {gen_event, start_link, [{local, mem3_events}]},
+ {mem3_events, MFA, permanent, 1000, worker, dynamic};
+child(Child) ->
+ {Child, {Child, start_link, []}, permanent, 1000, worker, [Child]}.
diff --git a/src/mem3/src/mem3_sync.erl b/src/mem3/src/mem3_sync.erl
new file mode 100644
index 000000000..640181509
--- /dev/null
+++ b/src/mem3/src/mem3_sync.erl
@@ -0,0 +1,319 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_sync).
+-behaviour(gen_server).
+-vsn(1).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0, get_active/0, get_queue/0, push/1, push/2,
+ remove_node/1, remove_shard/1, initial_sync/1, get_backlog/0, nodes_db/0,
+ shards_db/0, users_db/0, find_next_node/0]).
+
+-import(queue, [in/2, out/1, to_list/1, join/2, from_list/1, is_empty/1]).
+
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-record(state, {
+ active = [],
+ count = 0,
+ limit,
+ dict = dict:new(),
+ waiting = queue:new()
+}).
+
+-record(job, {name, node, count=nil, pid=nil}).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+get_active() ->
+ gen_server:call(?MODULE, get_active).
+
+get_queue() ->
+ gen_server:call(?MODULE, get_queue).
+
+get_backlog() ->
+ gen_server:call(?MODULE, get_backlog).
+
+push(#shard{name = Name}, Target) ->
+ push(Name, Target);
+push(Name, #shard{node=Node}) ->
+ push(Name, Node);
+push(Name, Node) ->
+ push(#job{name = Name, node = Node}).
+
+push(#job{node = Node} = Job) when Node =/= node() ->
+ gen_server:cast(?MODULE, {push, Job});
+push(_) ->
+ ok.
+
+remove_node(Node) ->
+ gen_server:cast(?MODULE, {remove_node, Node}).
+
+remove_shard(Shard) ->
+ gen_server:cast(?MODULE, {remove_shard, Shard}).
+
+init([]) ->
+ process_flag(trap_exit, true),
+ Concurrency = config:get("mem3", "sync_concurrency", "10"),
+ gen_event:add_handler(mem3_events, mem3_sync_event, []),
+ initial_sync(),
+ {ok, #state{limit = list_to_integer(Concurrency)}}.
+
+handle_call({push, Job}, From, State) ->
+ handle_cast({push, Job#job{pid = From}}, State);
+
+handle_call(get_active, _From, State) ->
+ {reply, State#state.active, State};
+
+handle_call(get_queue, _From, State) ->
+ {reply, to_list(State#state.waiting), State};
+
+handle_call(get_backlog, _From, #state{active=A, waiting=WQ} = State) ->
+ CA = lists:sum([C || #job{count=C} <- A, is_integer(C)]),
+ CW = lists:sum([C || #job{count=C} <- to_list(WQ), is_integer(C)]),
+ {reply, CA+CW, State}.
+
+handle_cast({push, DbName, Node}, State) ->
+ handle_cast({push, #job{name = DbName, node = Node}}, State);
+
+handle_cast({push, Job}, #state{count=Count, limit=Limit} = State)
+ when Count >= Limit ->
+ {noreply, add_to_queue(State, Job)};
+
+handle_cast({push, Job}, State) ->
+ #state{active = L, count = C} = State,
+ #job{name = DbName, node = Node} = Job,
+ case is_running(DbName, Node, L) of
+ true ->
+ {noreply, add_to_queue(State, Job)};
+ false ->
+ Pid = start_push_replication(Job),
+ {noreply, State#state{active=[Job#job{pid=Pid}|L], count=C+1}}
+ end;
+
+handle_cast({remove_node, Node}, #state{waiting = W0} = State) ->
+ {Alive, Dead} = lists:partition(fun(#job{node=N}) -> N =/= Node end, to_list(W0)),
+ Dict = remove_entries(State#state.dict, Dead),
+ [exit(Pid, die_now) || #job{node=N, pid=Pid} <- State#state.active,
+ N =:= Node],
+ {noreply, State#state{dict = Dict, waiting = from_list(Alive)}};
+
+handle_cast({remove_shard, Shard}, #state{waiting = W0} = State) ->
+ {Alive, Dead} = lists:partition(fun(#job{name=S}) ->
+ S =/= Shard end, to_list(W0)),
+ Dict = remove_entries(State#state.dict, Dead),
+ [exit(Pid, die_now) || #job{name=S, pid=Pid} <- State#state.active,
+ S =:= Shard],
+ {noreply, State#state{dict = Dict, waiting = from_list(Alive)}}.
+
+handle_info({'EXIT', Active, normal}, State) ->
+ handle_replication_exit(State, Active);
+
+handle_info({'EXIT', Active, die_now}, State) ->
+ % we forced this one ourselves, do not retry
+ handle_replication_exit(State, Active);
+
+handle_info({'EXIT', Active, {{not_found, no_db_file}, _Stack}}, State) ->
+ % target doesn't exist, do not retry
+ handle_replication_exit(State, Active);
+
+handle_info({'EXIT', Active, Reason}, State) ->
+ NewState = case lists:keyfind(Active, #job.pid, State#state.active) of
+ #job{name=OldDbName, node=OldNode} = Job ->
+ couch_log:warning("~s ~s ~s ~w", [?MODULE, OldDbName, OldNode, Reason]),
+ case Reason of {pending_changes, Count} ->
+ maybe_resubmit(State, Job#job{pid = nil, count = Count});
+ _ ->
+ try mem3:shards(mem3:dbname(Job#job.name)) of _ ->
+ timer:apply_after(5000, ?MODULE, push, [Job#job{pid=nil}])
+ catch error:database_does_not_exist ->
+ % no need to retry
+ ok
+ end,
+ State
+ end;
+ false -> State end,
+ handle_replication_exit(NewState, Active);
+
+handle_info(Msg, State) ->
+ couch_log:notice("unexpected msg at replication manager ~p", [Msg]),
+ {noreply, State}.
+
+terminate(_Reason, State) ->
+ [exit(Pid, shutdown) || #job{pid=Pid} <- State#state.active],
+ ok.
+
+code_change(_, #state{waiting = WaitingList} = State, _) when is_list(WaitingList) ->
+ {ok, State#state{waiting = from_list(WaitingList)}};
+
+code_change(_, State, _) ->
+ {ok, State}.
+
+maybe_resubmit(State, #job{name=DbName, node=Node} = Job) ->
+ case lists:member(DbName, local_dbs()) of
+ true ->
+ case find_next_node() of
+ Node ->
+ add_to_queue(State, Job);
+ _ ->
+ State % don't resubmit b/c we have a new replication target
+ end;
+ false ->
+ add_to_queue(State, Job)
+ end.
+
+handle_replication_exit(State, Pid) ->
+ #state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State,
+ Active1 = lists:keydelete(Pid, #job.pid, Active),
+ case is_empty(Waiting) of
+ true ->
+ {noreply, State#state{active=Active1, count=length(Active1)}};
+ _ ->
+ Count = length(Active1),
+ NewState = if Count < Limit ->
+ case next_replication(Active1, Waiting, queue:new()) of
+ nil -> % all waiting replications are also active
+ State#state{active = Active1, count = Count};
+ {#job{name=DbName, node=Node} = Job, StillWaiting} ->
+ NewPid = start_push_replication(Job),
+ State#state{
+ active = [Job#job{pid = NewPid} | Active1],
+ count = Count+1,
+ dict = dict:erase({DbName,Node}, D),
+ waiting = StillWaiting
+ }
+ end;
+ true ->
+ State#state{active = Active1, count=Count}
+ end,
+ {noreply, NewState}
+ end.
+
+start_push_replication(#job{name=Name, node=Node, pid=From}) ->
+ if From =/= nil -> gen_server:reply(From, ok); true -> ok end,
+ spawn_link(fun() ->
+ case mem3_rep:go(Name, maybe_redirect(Node)) of
+ {ok, Pending} when Pending > 0 ->
+ exit({pending_changes, Pending});
+ _ ->
+ ok
+ end
+ end).
+
+add_to_queue(State, #job{name=DbName, node=Node, pid=From} = Job) ->
+ #state{dict=D, waiting=WQ} = State,
+ case dict:is_key({DbName, Node}, D) of
+ true ->
+ if From =/= nil -> gen_server:reply(From, ok); true -> ok end,
+ State;
+ false ->
+ couch_log:debug("adding ~s -> ~p to mem3_sync queue", [DbName, Node]),
+ State#state{
+ dict = dict:store({DbName,Node}, ok, D),
+ waiting = in(Job, WQ)
+ }
+ end.
+
+sync_nodes_and_dbs() ->
+ Node = find_next_node(),
+ [push(Db, Node) || Db <- local_dbs()].
+
+initial_sync() ->
+ [net_kernel:connect_node(Node) || Node <- mem3:nodes()],
+ mem3_sync_nodes:add(nodes()).
+
+initial_sync(Live) ->
+ sync_nodes_and_dbs(),
+ Acc = {node(), Live, []},
+ {_, _, Shards} = mem3_shards:fold(fun initial_sync_fold/2, Acc),
+ submit_replication_tasks(node(), Live, Shards).
+
+initial_sync_fold(#shard{dbname = Db} = Shard, {LocalNode, Live, AccShards}) ->
+ case AccShards of
+ [#shard{dbname = AccDb} | _] when Db =/= AccDb ->
+ submit_replication_tasks(LocalNode, Live, AccShards),
+ {LocalNode, Live, [Shard]};
+ _ ->
+ {LocalNode, Live, [Shard|AccShards]}
+ end.
+
+submit_replication_tasks(LocalNode, Live, Shards) ->
+ SplitFun = fun(#shard{node = Node}) -> Node =:= LocalNode end,
+ {Local, Remote} = lists:partition(SplitFun, Shards),
+ lists:foreach(fun(#shard{name = ShardName}) ->
+ [sync_push(ShardName, N) || #shard{node=N, name=Name} <- Remote,
+ Name =:= ShardName, lists:member(N, Live)]
+ end, Local).
+
+sync_push(ShardName, N) ->
+ gen_server:call(mem3_sync, {push, #job{name=ShardName, node=N}}, infinity).
+
+
+
+find_next_node() ->
+ LiveNodes = [node()|nodes()],
+ AllNodes0 = lists:sort(mem3:nodes()),
+ AllNodes1 = [X || X <- AllNodes0, lists:member(X, LiveNodes)],
+ AllNodes = AllNodes1 ++ [hd(AllNodes1)],
+ [_Self, Next| _] = lists:dropwhile(fun(N) -> N =/= node() end, AllNodes),
+ Next.
+
+%% @doc Finds the next {DbName,Node} pair in the list of waiting replications
+%% which does not correspond to an already running replication
+-spec next_replication([#job{}], queue:queue(_), queue:queue(_)) ->
+ {#job{}, queue:queue(_)} | nil.
+next_replication(Active, Waiting, WaitingAndRunning) ->
+ case is_empty(Waiting) of
+ true ->
+ nil;
+ false ->
+ {{value, #job{name=S, node=N} = Job}, RemQ} = out(Waiting),
+ case is_running(S,N,Active) of
+ true ->
+ next_replication(Active, RemQ, in(Job, WaitingAndRunning));
+ false ->
+ {Job, join(RemQ, WaitingAndRunning)}
+ end
+ end.
+
+is_running(DbName, Node, ActiveList) ->
+ [] =/= [true || #job{name=S, node=N} <- ActiveList, S=:=DbName, N=:=Node].
+
+remove_entries(Dict, Entries) ->
+ lists:foldl(fun(#job{name=S, node=N}, D) ->
+ dict:erase({S, N}, D)
+ end, Dict, Entries).
+
+local_dbs() ->
+ [nodes_db(), shards_db(), users_db()].
+
+nodes_db() ->
+ ?l2b(config:get("mem3", "nodes_db", "_nodes")).
+
+shards_db() ->
+ ?l2b(config:get("mem3", "shards_db", "_dbs")).
+
+users_db() ->
+ ?l2b(config:get("couch_httpd_auth", "authentication_db", "_users")).
+
+maybe_redirect(Node) ->
+ case config:get("mem3.redirects", atom_to_list(Node)) of
+ undefined ->
+ Node;
+ Redirect ->
+ couch_log:debug("Redirecting push from ~p to ~p", [Node, Redirect]),
+ list_to_existing_atom(Redirect)
+ end.
diff --git a/src/mem3/src/mem3_sync_event.erl b/src/mem3/src/mem3_sync_event.erl
new file mode 100644
index 000000000..7bca23086
--- /dev/null
+++ b/src/mem3/src/mem3_sync_event.erl
@@ -0,0 +1,86 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_sync_event).
+-behaviour(gen_event).
+-vsn(1).
+
+-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+init(_) ->
+ net_kernel:monitor_nodes(true),
+ {ok, nil}.
+
+handle_event({add_node, Node}, State) when Node =/= node() ->
+ net_kernel:connect_node(Node),
+ mem3_sync_nodes:add([Node]),
+ {ok, State};
+
+handle_event({remove_node, Node}, State) ->
+ mem3_sync:remove_node(Node),
+ {ok, State};
+
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_call(_Request, State) ->
+ {ok, ok, State}.
+
+handle_info({nodeup, Node}, State) ->
+ Nodes0 = lists:usort([node() | drain_nodeups([Node])]),
+ Nodes = lists:filter(fun(N) -> lists:member(N, mem3:nodes()) end, Nodes0),
+ wait_for_rexi(Nodes, 5),
+ {ok, State};
+
+handle_info({nodedown, Node}, State) ->
+ mem3_sync:remove_node(Node),
+ {ok, State};
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+drain_nodeups(Acc) ->
+ receive
+ {nodeup, Node} ->
+ drain_nodeups([Node | Acc])
+ after 0 ->
+ Acc
+ end.
+
+wait_for_rexi([], _Retries) ->
+ ok;
+wait_for_rexi(Waiting, Retries) ->
+ % Hack around rpc:multicall/4 so that we can
+ % be sure which nodes gave which response
+ Msg = {call, rexi_server_mon, status, [], group_leader()},
+ {Resp, _Bad} = gen_server:multi_call(Waiting, rex, Msg, 1000),
+ Up = [N || {N, R} <- Resp, R == ok],
+ NotUp = Waiting -- Up,
+ case length(Up) > 0 of
+ true ->
+ mem3_sync_nodes:add(Up);
+ false -> ok
+ end,
+ case length(NotUp) > 0 andalso Retries > 0 of
+ true ->
+ timer:sleep(1000),
+ wait_for_rexi(NotUp, Retries-1);
+ false ->
+ ok
+ end.
diff --git a/src/mem3/src/mem3_sync_event_listener.erl b/src/mem3/src/mem3_sync_event_listener.erl
new file mode 100644
index 000000000..7859c3109
--- /dev/null
+++ b/src/mem3/src/mem3_sync_event_listener.erl
@@ -0,0 +1,309 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_sync_event_listener).
+-behavior(couch_event_listener).
+-vsn(1).
+
+-export([
+ start_link/0
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_event/3,
+ handle_cast/2,
+ handle_info/2
+]).
+
+-include_lib("mem3/include/mem3.hrl").
+
+-ifdef(TEST).
+-define(RELISTEN_DELAY, 500).
+-else.
+-define(RELISTEN_DELAY, 5000).
+-endif.
+
+-record(state, {
+ nodes,
+ shards,
+ users,
+ delay,
+ frequency,
+ last_push,
+ buckets
+}).
+
+%% Calling mem3_sync:push/2 on every update has a measurable performance cost,
+%% so we'd like to coalesce multiple update messages from couch_event in to a
+%% single push call. Doing this while ensuring both correctness (i.e., no lost
+%% updates) and an even load profile is somewhat subtle. This implementation
+%% groups updated shards in a list of "buckets" (see bucket_shard/2) and
+%% guarantees that each shard is in no more than one bucket at a time - i.e.,
+%% any update messages received before the shard's current bucket has been
+%% pushed will be ignored - thereby reducing the frequency with which a single
+%% shard will be pushed. mem3_sync:push/2 is called on all shards in the
+%% *oldest* bucket roughly every mem3.sync_frequency milliseconds (see
+%% maybe_push_shards/1) to even out the load on mem3_sync.
+
+start_link() ->
+ couch_event_listener:start_link(?MODULE, [], [all_dbs]).
+
+init(_) ->
+ ok = subscribe_for_config(),
+ Delay = config:get_integer("mem3", "sync_delay", 5000),
+ Frequency = config:get_integer("mem3", "sync_frequency", 500),
+ Buckets = lists:duplicate(Delay div Frequency + 1, sets:new()),
+ St = #state{
+ nodes = mem3_sync:nodes_db(),
+ shards = mem3_sync:shards_db(),
+ users = mem3_sync:users_db(),
+ delay = Delay,
+ frequency = Frequency,
+ buckets = Buckets
+ },
+ {ok, St}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+handle_event(NodesDb, updated, #state{nodes = NodesDb} = St) ->
+ Nodes = mem3:nodes(),
+ Live = nodes(),
+ [mem3_sync:push(NodesDb, N) || N <- Nodes, lists:member(N, Live)],
+ maybe_push_shards(St);
+handle_event(ShardsDb, updated, #state{shards = ShardsDb} = St) ->
+ mem3_sync:push(ShardsDb, mem3_sync:find_next_node()),
+ maybe_push_shards(St);
+handle_event(UsersDb, updated, #state{users = UsersDb} = St) ->
+ mem3_sync:push(UsersDb, mem3_sync:find_next_node()),
+ maybe_push_shards(St);
+handle_event(<<"shards/", _/binary>> = ShardName, updated, St) ->
+ Buckets = bucket_shard(ShardName, St#state.buckets),
+ maybe_push_shards(St#state{buckets=Buckets});
+handle_event(<<"shards/", _:18/binary, _/binary>> = ShardName, deleted, St) ->
+ mem3_sync:remove_shard(ShardName),
+ maybe_push_shards(St);
+handle_event(_DbName, _Event, St) ->
+ maybe_push_shards(St).
+
+handle_cast({set_frequency, Frequency}, St) ->
+ #state{delay = Delay, buckets = Buckets0} = St,
+ Buckets1 = rebucket_shards(Delay, Frequency, Buckets0),
+ maybe_push_shards(St#state{frequency=Frequency, buckets=Buckets1});
+handle_cast({set_delay, Delay}, St) ->
+ #state{frequency = Frequency, buckets = Buckets0} = St,
+ Buckets1 = rebucket_shards(Delay, Frequency, Buckets0),
+ maybe_push_shards(St#state{delay=Delay, buckets=Buckets1});
+handle_cast(Msg, St) ->
+ couch_log:notice("unexpected cast to mem3_sync_event_listener: ~p", [Msg]),
+ maybe_push_shards(St).
+
+handle_info(timeout, St) ->
+ maybe_push_shards(St);
+handle_info({config_change, "mem3", "sync_delay", Value, _}, St) ->
+ set_config(set_delay, Value, "ignoring bad value for mem3.sync_delay"),
+ maybe_push_shards(St);
+handle_info({config_change, "mem3", "sync_frequency", Value, _}, St) ->
+ set_config(set_frequency, Value, "ignoring bad value for mem3.sync_frequency"),
+ maybe_push_shards(St);
+handle_info({gen_event_EXIT, _Handler, _Reason}, St) ->
+ erlang:send_after(?RELISTEN_DELAY, self(), restart_config_listener),
+ maybe_push_shards(St);
+handle_info(restart_config_listener, St) ->
+ ok = subscribe_for_config(),
+ maybe_push_shards(St);
+handle_info({get_state, Ref, Caller}, St) ->
+ Caller ! {Ref, St},
+ {ok, St};
+handle_info(Msg, St) ->
+ couch_log:notice("unexpected info to mem3_sync_event_listener: ~p", [Msg]),
+ maybe_push_shards(St).
+
+set_config(Cmd, Value, Error) ->
+ try list_to_integer(Value) of
+ IntegerValue ->
+ couch_event_listener:cast(self(), {Cmd, IntegerValue})
+ catch error:badarg ->
+ couch_log:warning("~s: ~p", [Error, Value])
+ end.
+
+bucket_shard(ShardName, [B|Bs]=Buckets0) ->
+ case waiting(ShardName, Buckets0) of
+ true -> Buckets0;
+ false -> [sets:add_element(ShardName, B)|Bs]
+ end.
+
+waiting(_, []) ->
+ false;
+waiting(ShardName, [B|Bs]) ->
+ case sets:is_element(ShardName, B) of
+ true -> true;
+ false -> waiting(ShardName, Bs)
+ end.
+
+rebucket_shards(Frequency, Delay, Buckets0) ->
+ case (Delay div Frequency + 1) - length(Buckets0) of
+ 0 ->
+ Buckets0;
+ N when N < 0 ->
+ %% Reduce the number of buckets by merging the last N + 1 together
+ {ToMerge, [B|Buckets1]} = lists:split(abs(N), Buckets0),
+ [sets:union([B|ToMerge])|Buckets1];
+ M ->
+ %% Extend the number of buckets by M
+ lists:duplicate(M, sets:new()) ++ Buckets0
+ end.
+
+%% To ensure that mem3_sync:push/2 is indeed called with roughly the frequency
+%% specified by #state.frequency, every message callback must return via a call
+%% to maybe_push_shards/1 rather than directly. All timing coordination - i.e.,
+%% calling mem3_sync:push/2 or setting a proper timeout to ensure that pending
+%% messages aren't dropped in case no further messages arrive - is handled here.
+maybe_push_shards(#state{last_push=undefined} = St) ->
+ {ok, St#state{last_push=os:timestamp()}, St#state.frequency};
+maybe_push_shards(St) ->
+ #state{frequency=Frequency, last_push=LastPush, buckets=Buckets0} = St,
+ Now = os:timestamp(),
+ Delta = timer:now_diff(Now, LastPush) div 1000,
+ case Delta > Frequency of
+ true ->
+ {Buckets1, [ToPush]} = lists:split(length(Buckets0) - 1, Buckets0),
+ Buckets2 = [sets:new()|Buckets1],
+ %% There's no sets:map/2!
+ sets:fold(
+ fun(ShardName, _) -> push_shard(ShardName) end,
+ undefined,
+ ToPush
+ ),
+ {ok, St#state{last_push=Now, buckets=Buckets2}, Frequency};
+ false ->
+ {ok, St, Frequency - Delta}
+ end.
+
+push_shard(ShardName) ->
+ try mem3_shards:for_shard_name(ShardName) of
+ Shards ->
+ Live = nodes(),
+ lists:foreach(
+ fun(#shard{node=N}) ->
+ case lists:member(N, Live) of
+ true -> mem3_sync:push(ShardName, N);
+ false -> ok
+ end
+ end,
+ Shards
+ )
+ catch error:database_does_not_exist ->
+ ok
+ end.
+
+subscribe_for_config() ->
+ config:subscribe_for_changes([
+ {"mem3", "sync_delay"},
+ {"mem3", "sync_frequency"}
+ ]).
+
+-ifdef(TEST).
+-include_lib("couch/include/couch_eunit.hrl").
+
+setup() ->
+ ok = meck:new(couch_event, [passthrough]),
+ ok = meck:expect(couch_event, register_all, ['_'], ok),
+
+ ok = meck:new(config_notifier, [passthrough]),
+ ok = meck:expect(config_notifier, handle_event, [
+ {[{'_', '_', "error", '_'}, '_'], meck:raise(throw, raised_error)},
+ {['_', '_'], meck:passthrough()}
+ ]),
+
+ application:start(config),
+ {ok, Pid} = ?MODULE:start_link(),
+ erlang:unlink(Pid),
+ meck:wait(config_notifier, subscribe, '_', 1000),
+ Pid.
+
+teardown(Pid) ->
+ exit(Pid, shutdown),
+ application:stop(config),
+ (catch meck:unload(couch_event)),
+ (catch meck:unload(config_notifier)),
+ ok.
+
+subscribe_for_config_test_() ->
+ {
+ "Subscrive for configuration changes",
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun should_set_sync_delay/1,
+ fun should_set_sync_frequency/1,
+ fun should_restart_listener/1,
+ fun should_terminate/1
+ ]
+ }
+ }.
+
+should_set_sync_delay(Pid) ->
+ ?_test(begin
+ config:set("mem3", "sync_delay", "123", false),
+ ?assertMatch(#state{delay = 123}, capture(Pid)),
+ ok
+ end).
+
+should_set_sync_frequency(Pid) ->
+ ?_test(begin
+ config:set("mem3", "sync_frequency", "456", false),
+ ?assertMatch(#state{frequency = 456}, capture(Pid)),
+ ok
+ end).
+
+should_restart_listener(Pid) ->
+ ?_test(begin
+ meck:reset(config_notifier),
+ config:set("mem3", "sync_frequency", "error", false),
+
+ meck:wait(config_notifier, subscribe, '_', 1000),
+ ok
+ end).
+
+should_terminate(Pid) ->
+ ?_test(begin
+ ?assert(is_process_alive(Pid)),
+
+ EventMgr = whereis(config_event),
+
+ RestartFun = fun() -> exit(EventMgr, kill) end,
+ test_util:with_process_restart(config_event, RestartFun),
+
+ ?assertNot(is_process_alive(EventMgr)),
+ ?assertNot(is_process_alive(Pid)),
+ ?assert(is_process_alive(whereis(config_event))),
+ ok
+ end).
+
+capture(Pid) ->
+ Ref = make_ref(),
+ WaitFun = fun() ->
+ Pid ! {get_state, Ref, self()},
+ receive
+ {Ref, State} -> State
+ after 0 ->
+ wait
+ end
+ end,
+ test_util:wait(WaitFun).
+
+
+-endif.
diff --git a/src/mem3/src/mem3_sync_nodes.erl b/src/mem3/src/mem3_sync_nodes.erl
new file mode 100644
index 000000000..0a4bffcd2
--- /dev/null
+++ b/src/mem3/src/mem3_sync_nodes.erl
@@ -0,0 +1,115 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_sync_nodes).
+-behaviour(gen_server).
+-vsn(1).
+
+
+-export([start_link/0]).
+-export([add/1]).
+
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-export([monitor_sync/1]).
+
+
+-record(st, {
+ tid
+}).
+
+
+-record(job, {
+ nodes,
+ pid,
+ retry
+}).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+add(Nodes) ->
+ gen_server:cast(?MODULE, {add, Nodes}).
+
+
+init([]) ->
+ {ok, #st{
+ tid = ets:new(?MODULE, [set, protected, {keypos, #job.nodes}])
+ }}.
+
+
+terminate(_Reason, St) ->
+ [exit(Pid, kill) || #job{pid=Pid} <- ets:tab2list(St#st.tid)],
+ ok.
+
+
+handle_call(Msg, _From, St) ->
+ {stop, {invalid_call, Msg}, invalid_call, St}.
+
+
+handle_cast({add, Nodes}, #st{tid=Tid}=St) ->
+ case ets:lookup(Tid, Nodes) of
+ [] ->
+ Pid = start_sync(Nodes),
+ ets:insert(Tid, #job{nodes=Nodes, pid=Pid, retry=false});
+ [#job{retry=false}=Job] ->
+ ets:insert(Tid, Job#job{retry=true});
+ _ ->
+ ok
+ end,
+ {noreply, St};
+
+handle_cast(Msg, St) ->
+ {stop, {invalid_cast, Msg}, St}.
+
+
+handle_info({'DOWN', _, _, _, {sync_done, Nodes}}, #st{tid=Tid}=St) ->
+ case ets:lookup(Tid, Nodes) of
+ [#job{retry=true}=Job] ->
+ Pid = start_sync(Nodes),
+ ets:insert(Tid, Job#job{pid=Pid, retry=false});
+ _ ->
+ ets:delete(Tid, Nodes)
+ end,
+ {noreply, St};
+
+handle_info({'DOWN', _, _, _, {sync_error, Nodes}}, #st{tid=Tid}=St) ->
+ Pid = start_sync(Nodes),
+ ets:insert(Tid, #job{nodes=Nodes, pid=Pid, retry=false}),
+ {noreply, St};
+
+handle_info(Msg, St) ->
+ {stop, {invalid_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+start_sync(Nodes) ->
+ {Pid, _} = spawn_monitor(?MODULE, monitor_sync, [Nodes]),
+ Pid.
+
+
+monitor_sync(Nodes) ->
+ process_flag(trap_exit, true),
+ Pid = spawn_link(mem3_sync, initial_sync, [Nodes]),
+ receive
+ {'EXIT', Pid, normal} ->
+ exit({sync_done, Nodes});
+ _ ->
+ exit({sync_error, Nodes})
+ end.
+
diff --git a/src/mem3/src/mem3_sync_security.erl b/src/mem3/src/mem3_sync_security.erl
new file mode 100644
index 000000000..9edd0ec57
--- /dev/null
+++ b/src/mem3/src/mem3_sync_security.erl
@@ -0,0 +1,107 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_sync_security).
+
+-export([maybe_sync/2, maybe_sync_int/2]).
+-export([go/0, go/1]).
+
+-include_lib("mem3/include/mem3.hrl").
+
+
+maybe_sync(#shard{}=Src, #shard{}=Dst) ->
+ case is_local(Src#shard.name) of
+ false ->
+ erlang:spawn(?MODULE, maybe_sync_int, [Src, Dst]);
+ true ->
+ ok
+ end.
+
+maybe_sync_int(#shard{name=Name}=Src, Dst) ->
+ DbName = mem3:dbname(Name),
+ case fabric:get_all_security(DbName, [{shards, [Src, Dst]}]) of
+ {ok, WorkerObjs} ->
+ Objs = [Obj || {_Worker, Obj} <- WorkerObjs],
+ case length(lists:usort(Objs)) of
+ 1 -> ok;
+ 2 -> go(DbName)
+ end;
+ {error, no_majority} ->
+ go(DbName);
+ Else ->
+ Args = [DbName, Else],
+ couch_log:error("Error checking security objects for ~s :: ~p", Args)
+ end.
+
+go() ->
+ {ok, Dbs} = fabric:all_dbs(),
+ lists:foreach(fun handle_db/1, Dbs).
+
+go(DbName) when is_binary(DbName) ->
+ handle_db(DbName).
+
+handle_db(DbName) ->
+ ShardCount = length(mem3:shards(DbName)),
+ case get_all_security(DbName) of
+ {ok, SecObjs} ->
+ case is_ok(SecObjs, ShardCount) of
+ ok ->
+ ok;
+ {fixable, SecObj} ->
+ couch_log:info("Sync security object for ~p: ~p", [DbName, SecObj]),
+ case fabric:set_security(DbName, SecObj) of
+ ok -> ok;
+ Error ->
+ couch_log:error("Error setting security object in ~p: ~p",
+ [DbName, Error])
+ end;
+ broken ->
+ couch_log:error("Bad security object in ~p: ~p", [DbName, SecObjs])
+ end;
+ Error ->
+ couch_log:error("Error getting security objects for ~p: ~p", [
+ DbName, Error])
+ end.
+
+get_all_security(DbName) ->
+ case fabric:get_all_security(DbName) of
+ {ok, SecObjs} ->
+ SecObjsDict = lists:foldl(fun({_, SO}, Acc) ->
+ dict:update_counter(SO, 1, Acc)
+ end, dict:new(), SecObjs),
+ {ok, dict:to_list(SecObjsDict)};
+ Error ->
+ Error
+ end.
+
+is_ok([_], _) ->
+ % One security object is the happy case
+ ok;
+is_ok([_, _] = SecObjs0, ShardCount) ->
+ % Figure out if we have a simple majority of security objects
+ % and if so, use that as the correct value. Otherwise we abort
+ % and rely on human intervention.
+ {Count, SecObj} = lists:max([{C, O} || {O, C} <- SecObjs0]),
+ case Count >= ((ShardCount div 2) + 1) of
+ true -> {fixable, SecObj};
+ false -> broken
+ end;
+is_ok(_, _) ->
+ % Anything else requires human intervention
+ broken.
+
+
+is_local(<<"shards/", _/binary>>) ->
+ false;
+is_local(_) ->
+ true.
+
diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl
new file mode 100644
index 000000000..2cd444d8c
--- /dev/null
+++ b/src/mem3/src/mem3_util.erl
@@ -0,0 +1,253 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_util).
+
+-export([hash/1, name_shard/2, create_partition_map/5, build_shards/2,
+ n_val/2, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1,
+ shard_info/1, ensure_exists/1, open_db_doc/1]).
+-export([is_deleted/1, rotate_list/2]).
+
+%% do not use outside mem3.
+-export([build_ordered_shards/2, downcast/1]).
+
+-export([create_partition_map/4, name_shard/1]).
+-deprecated({create_partition_map, 4, eventually}).
+-deprecated({name_shard, 1, eventually}).
+
+-define(RINGTOP, 2 bsl 31). % CRC32 space
+
+-include_lib("mem3/include/mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+hash(Item) when is_binary(Item) ->
+ erlang:crc32(Item);
+hash(Item) ->
+ erlang:crc32(term_to_binary(Item)).
+
+name_shard(Shard) ->
+ name_shard(Shard, "").
+
+name_shard(#shard{dbname = DbName, range=Range} = Shard, Suffix) ->
+ Name = make_name(DbName, Range, Suffix),
+ Shard#shard{name = ?l2b(Name)};
+
+name_shard(#ordered_shard{dbname = DbName, range=Range} = Shard, Suffix) ->
+ Name = make_name(DbName, Range, Suffix),
+ Shard#ordered_shard{name = ?l2b(Name)}.
+
+make_name(DbName, [B,E], Suffix) ->
+ ["shards/", couch_util:to_hex(<<B:32/integer>>), "-",
+ couch_util:to_hex(<<E:32/integer>>), "/", DbName, Suffix].
+
+create_partition_map(DbName, N, Q, Nodes) ->
+ create_partition_map(DbName, N, Q, Nodes, "").
+
+create_partition_map(DbName, N, Q, Nodes, Suffix) ->
+ UniqueShards = make_key_ranges((?RINGTOP) div Q, 0, []),
+ Shards0 = lists:flatten([lists:duplicate(N, S) || S <- UniqueShards]),
+ Shards1 = attach_nodes(Shards0, [], Nodes, []),
+ [name_shard(S#shard{dbname=DbName}, Suffix) || S <- Shards1].
+
+make_key_ranges(_, CurrentPos, Acc) when CurrentPos >= ?RINGTOP ->
+ Acc;
+make_key_ranges(Increment, Start, Acc) ->
+ case Start + 2*Increment of
+ X when X > ?RINGTOP ->
+ End = ?RINGTOP - 1;
+ _ ->
+ End = Start + Increment - 1
+ end,
+ make_key_ranges(Increment, End+1, [#shard{range=[Start, End]} | Acc]).
+
+attach_nodes([], Acc, _, _) ->
+ lists:reverse(Acc);
+attach_nodes(Shards, Acc, [], UsedNodes) ->
+ attach_nodes(Shards, Acc, lists:reverse(UsedNodes), []);
+attach_nodes([S | Rest], Acc, [Node | Nodes], UsedNodes) ->
+ attach_nodes(Rest, [S#shard{node=Node} | Acc], Nodes, [Node | UsedNodes]).
+
+open_db_doc(DocId) ->
+ DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
+ {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
+ try couch_db:open_doc(Db, DocId, [ejson_body]) after couch_db:close(Db) end.
+
+write_db_doc(Doc) ->
+ DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
+ write_db_doc(DbName, Doc, true).
+
+write_db_doc(DbName, #doc{id=Id, body=Body} = Doc, ShouldMutate) ->
+ {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
+ try couch_db:open_doc(Db, Id, [ejson_body]) of
+ {ok, #doc{body = Body}} ->
+ % the doc is already in the desired state, we're done here
+ ok;
+ {not_found, _} when ShouldMutate ->
+ try couch_db:update_doc(Db, Doc, []) of
+ {ok, _} ->
+ ok
+ catch conflict ->
+ % check to see if this was a replication race or a different edit
+ write_db_doc(DbName, Doc, false)
+ end;
+ _ ->
+ % the doc already exists in a different state
+ conflict
+ after
+ couch_db:close(Db)
+ end.
+
+delete_db_doc(DocId) ->
+ gen_server:cast(mem3_shards, {cache_remove, DocId}),
+ DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
+ delete_db_doc(DbName, DocId, true).
+
+delete_db_doc(DbName, DocId, ShouldMutate) ->
+ {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
+ {ok, Revs} = couch_db:open_doc_revs(Db, DocId, all, []),
+ try [Doc#doc{deleted=true} || {ok, #doc{deleted=false}=Doc} <- Revs] of
+ [] ->
+ not_found;
+ Docs when ShouldMutate ->
+ try couch_db:update_docs(Db, Docs, []) of
+ {ok, _} ->
+ ok
+ catch conflict ->
+ % check to see if this was a replication race or if leafs survived
+ delete_db_doc(DbName, DocId, false)
+ end;
+ _ ->
+ % we have live leafs that we aren't allowed to delete. let's bail
+ conflict
+ after
+ couch_db:close(Db)
+ end.
+
+%% Always returns original #shard records.
+-spec build_shards(binary(), list()) -> [#shard{}].
+build_shards(DbName, DocProps) ->
+ build_shards_by_node(DbName, DocProps).
+
+%% Will return #ordered_shard records if by_node and by_range
+%% are symmetrical, #shard records otherwise.
+-spec build_ordered_shards(binary(), list()) ->
+ [#shard{}] | [#ordered_shard{}].
+build_ordered_shards(DbName, DocProps) ->
+ ByNode = build_shards_by_node(DbName, DocProps),
+ ByRange = build_shards_by_range(DbName, DocProps),
+ Symmetrical = lists:sort(ByNode) =:= lists:sort(downcast(ByRange)),
+ case Symmetrical of
+ true -> ByRange;
+ false -> ByNode
+ end.
+
+build_shards_by_node(DbName, DocProps) ->
+ {ByNode} = couch_util:get_value(<<"by_node">>, DocProps, {[]}),
+ Suffix = couch_util:get_value(<<"shard_suffix">>, DocProps, ""),
+ lists:flatmap(fun({Node, Ranges}) ->
+ lists:map(fun(Range) ->
+ [B,E] = string:tokens(?b2l(Range), "-"),
+ Beg = httpd_util:hexlist_to_integer(B),
+ End = httpd_util:hexlist_to_integer(E),
+ name_shard(#shard{
+ dbname = DbName,
+ node = to_atom(Node),
+ range = [Beg, End]
+ }, Suffix)
+ end, Ranges)
+ end, ByNode).
+
+build_shards_by_range(DbName, DocProps) ->
+ {ByRange} = couch_util:get_value(<<"by_range">>, DocProps, {[]}),
+ Suffix = couch_util:get_value(<<"shard_suffix">>, DocProps, ""),
+ lists:flatmap(fun({Range, Nodes}) ->
+ lists:map(fun({Node, Order}) ->
+ [B,E] = string:tokens(?b2l(Range), "-"),
+ Beg = httpd_util:hexlist_to_integer(B),
+ End = httpd_util:hexlist_to_integer(E),
+ name_shard(#ordered_shard{
+ dbname = DbName,
+ node = to_atom(Node),
+ range = [Beg, End],
+ order = Order
+ }, Suffix)
+ end, lists:zip(Nodes, lists:seq(1, length(Nodes))))
+ end, ByRange).
+
+to_atom(Node) when is_binary(Node) ->
+ list_to_atom(binary_to_list(Node));
+to_atom(Node) when is_atom(Node) ->
+ Node.
+
+to_integer(N) when is_integer(N) ->
+ N;
+to_integer(N) when is_binary(N) ->
+ list_to_integer(binary_to_list(N));
+to_integer(N) when is_list(N) ->
+ list_to_integer(N).
+
+n_val(undefined, NodeCount) ->
+ n_val(config:get("cluster", "n", "3"), NodeCount);
+n_val(N, NodeCount) when is_list(N) ->
+ n_val(list_to_integer(N), NodeCount);
+n_val(N, NodeCount) when is_integer(NodeCount), N > NodeCount ->
+ couch_log:error("Request to create N=~p DB but only ~p node(s)", [N, NodeCount]),
+ NodeCount;
+n_val(N, _) when N < 1 ->
+ 1;
+n_val(N, _) ->
+ N.
+
+shard_info(DbName) ->
+ [{n, mem3:n(DbName)},
+ {q, length(mem3:shards(DbName)) div mem3:n(DbName)}].
+
+ensure_exists(DbName) when is_list(DbName) ->
+ ensure_exists(list_to_binary(DbName));
+ensure_exists(DbName) ->
+ case couch_db:open(DbName, [nologifmissing, sys_db | [?ADMIN_CTX]]) of
+ {ok, Db} ->
+ {ok, Db};
+ _ ->
+ couch_server:create(DbName, [?ADMIN_CTX])
+ end.
+
+
+is_deleted(Change) ->
+ case couch_util:get_value(<<"deleted">>, Change) of
+ undefined ->
+ % keep backwards compatibility for a while
+ couch_util:get_value(deleted, Change, false);
+ Else ->
+ Else
+ end.
+
+rotate_list(_Key, []) ->
+ [];
+rotate_list(Key, List) when not is_binary(Key) ->
+ rotate_list(term_to_binary(Key), List);
+rotate_list(Key, List) ->
+ {H, T} = lists:split(erlang:crc32(Key) rem length(List), List),
+ T ++ H.
+
+downcast(#shard{}=S) ->
+ S;
+downcast(#ordered_shard{}=S) ->
+ #shard{
+ name = S#ordered_shard.name,
+ node = S#ordered_shard.node,
+ dbname = S#ordered_shard.dbname,
+ range = S#ordered_shard.range,
+ ref = S#ordered_shard.ref
+ };
+downcast(Shards) when is_list(Shards) ->
+ [downcast(Shard) || Shard <- Shards].
diff --git a/src/mem3/test/01-config-default.ini b/src/mem3/test/01-config-default.ini
new file mode 100644
index 000000000..dde92ce2d
--- /dev/null
+++ b/src/mem3/test/01-config-default.ini
@@ -0,0 +1,14 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+[cluster]
+n=3
diff --git a/src/mem3/test/mem3_util_test.erl b/src/mem3/test/mem3_util_test.erl
new file mode 100644
index 000000000..340a58a63
--- /dev/null
+++ b/src/mem3/test/mem3_util_test.erl
@@ -0,0 +1,167 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mem3_util_test).
+
+-include("mem3.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+hash_test() ->
+ ?assertEqual(1624516141,mem3_util:hash(0)),
+ ?assertEqual(3816901808,mem3_util:hash("0")),
+ ?assertEqual(3523407757,mem3_util:hash(<<0>>)),
+ ?assertEqual(4108050209,mem3_util:hash(<<"0">>)),
+ ?assertEqual(3094724072,mem3_util:hash(zero)),
+ ok.
+
+name_shard_test() ->
+ Shard1 = #shard{},
+ ?assertError(function_clause, mem3_util:name_shard(Shard1, ".1234")),
+
+ Shard2 = #shard{dbname = <<"testdb">>, range = [0,100]},
+ #shard{name=Name2} = mem3_util:name_shard(Shard2, ".1234"),
+ ?assertEqual(<<"shards/00000000-00000064/testdb.1234">>, Name2),
+
+ ok.
+
+create_partition_map_test() ->
+ {DbName1, N1, Q1, Nodes1} = {<<"testdb1">>, 3, 4, [a,b,c,d]},
+ Map1 = mem3_util:create_partition_map(DbName1, N1, Q1, Nodes1),
+ ?assertEqual(12, length(Map1)),
+
+ {DbName2, N2, Q2, Nodes2} = {<<"testdb2">>, 1, 1, [a,b,c,d]},
+ [#shard{name=Name2,node=Node2}] = Map2 =
+ mem3_util:create_partition_map(DbName2, N2, Q2, Nodes2, ".1234"),
+ ?assertEqual(1, length(Map2)),
+ ?assertEqual(<<"shards/00000000-ffffffff/testdb2.1234">>, Name2),
+ ?assertEqual(a, Node2),
+ ok.
+
+build_shards_test() ->
+ DocProps1 =
+ [{<<"changelog">>,
+ [[<<"add">>,<<"00000000-1fffffff">>,
+ <<"bigcouch@node.local">>],
+ [<<"add">>,<<"20000000-3fffffff">>,
+ <<"bigcouch@node.local">>],
+ [<<"add">>,<<"40000000-5fffffff">>,
+ <<"bigcouch@node.local">>],
+ [<<"add">>,<<"60000000-7fffffff">>,
+ <<"bigcouch@node.local">>],
+ [<<"add">>,<<"80000000-9fffffff">>,
+ <<"bigcouch@node.local">>],
+ [<<"add">>,<<"a0000000-bfffffff">>,
+ <<"bigcouch@node.local">>],
+ [<<"add">>,<<"c0000000-dfffffff">>,
+ <<"bigcouch@node.local">>],
+ [<<"add">>,<<"e0000000-ffffffff">>,
+ <<"bigcouch@node.local">>]]},
+ {<<"by_node">>,
+ {[{<<"bigcouch@node.local">>,
+ [<<"00000000-1fffffff">>,<<"20000000-3fffffff">>,
+ <<"40000000-5fffffff">>,<<"60000000-7fffffff">>,
+ <<"80000000-9fffffff">>,<<"a0000000-bfffffff">>,
+ <<"c0000000-dfffffff">>,<<"e0000000-ffffffff">>]}]}},
+ {<<"by_range">>,
+ {[{<<"00000000-1fffffff">>,[<<"bigcouch@node.local">>]},
+ {<<"20000000-3fffffff">>,[<<"bigcouch@node.local">>]},
+ {<<"40000000-5fffffff">>,[<<"bigcouch@node.local">>]},
+ {<<"60000000-7fffffff">>,[<<"bigcouch@node.local">>]},
+ {<<"80000000-9fffffff">>,[<<"bigcouch@node.local">>]},
+ {<<"a0000000-bfffffff">>,[<<"bigcouch@node.local">>]},
+ {<<"c0000000-dfffffff">>,[<<"bigcouch@node.local">>]},
+ {<<"e0000000-ffffffff">>,[<<"bigcouch@node.local">>]}]}}],
+ Shards1 = mem3_util:build_shards(<<"testdb1">>, DocProps1),
+ ExpectedShards1 =
+ [{shard,<<"shards/00000000-1fffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [0,536870911],
+ undefined},
+ {shard,<<"shards/20000000-3fffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [536870912,1073741823],
+ undefined},
+ {shard,<<"shards/40000000-5fffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [1073741824,1610612735],
+ undefined},
+ {shard,<<"shards/60000000-7fffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [1610612736,2147483647],
+ undefined},
+ {shard,<<"shards/80000000-9fffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [2147483648,2684354559],
+ undefined},
+ {shard,<<"shards/a0000000-bfffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [2684354560,3221225471],
+ undefined},
+ {shard,<<"shards/c0000000-dfffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [3221225472,3758096383],
+ undefined},
+ {shard,<<"shards/e0000000-ffffffff/testdb1">>,
+ 'bigcouch@node.local',<<"testdb1">>,
+ [3758096384,4294967295],
+ undefined}],
+ ?assertEqual(ExpectedShards1, Shards1),
+ ok.
+
+
+%% n_val tests
+
+nval_test_() ->
+ {"n_val tests explicit",
+ [
+ {setup,
+ fun () ->
+ meck:new([couch_log]),
+ meck:expect(couch_log, error, fun(_, _) -> ok end),
+ ok
+ end,
+ fun (_) -> meck:unload([couch_log]) end,
+ [
+ ?_assertEqual(2, mem3_util:n_val(2,4)),
+ ?_assertEqual(1, mem3_util:n_val(-1,4)),
+ ?_assertEqual(4, mem3_util:n_val(6,4))
+ ]
+ }
+ ]
+ }.
+
+
+config_01_setup() ->
+ Ini = filename:join([code:lib_dir(mem3, test), "01-config-default.ini"]),
+ {ok, Pid} = config:start_link([Ini]),
+ Pid.
+
+config_teardown(_Pid) ->
+ config:stop().
+
+
+n_val_test_() ->
+ {"n_val tests with config",
+ [
+ {setup,
+ fun config_01_setup/0,
+ fun config_teardown/1,
+ fun(Pid) ->
+ {with, Pid, [
+ fun n_val_1/1
+ ]}
+ end}
+ ]
+ }.
+
+n_val_1(_Pid) ->
+ ?assertEqual(3, mem3_util:n_val(undefined, 4)).