diff options
author | Jan Lehnardt <jan@apache.org> | 2017-04-01 19:38:53 +0200 |
---|---|---|
committer | Jan Lehnardt <jan@apache.org> | 2017-04-01 19:38:53 +0200 |
commit | 81ad7005d01fbc03bd7573aaa97f0b44c137e6c3 (patch) | |
tree | 67a5072bb6df37cd545b40955b40bfca03e6edf7 | |
parent | 8f39d4191040e7519a573ce1b3cdf5b674bf8e4f (diff) | |
parent | c3c5429180de14a2b139f7741c934143ef73988c (diff) | |
download | couchdb-81ad7005d01fbc03bd7573aaa97f0b44c137e6c3.tar.gz |
Add 'src/mem3/' from commit 'c3c5429180de14a2b139f7741c934143ef73988c'
git-subtree-dir: src/mem3
git-subtree-mainline: 8f39d4191040e7519a573ce1b3cdf5b674bf8e4f
git-subtree-split: c3c5429180de14a2b139f7741c934143ef73988c
-rw-r--r-- | src/mem3/LICENSE | 202 | ||||
-rw-r--r-- | src/mem3/README.md | 43 | ||||
-rw-r--r-- | src/mem3/include/mem3.hrl | 52 | ||||
-rw-r--r-- | src/mem3/priv/stats_descriptions.cfg | 12 | ||||
-rw-r--r-- | src/mem3/src/mem3.app.src | 53 | ||||
-rw-r--r-- | src/mem3/src/mem3.erl | 308 | ||||
-rw-r--r-- | src/mem3/src/mem3_app.erl | 21 | ||||
-rw-r--r-- | src/mem3/src/mem3_epi.erl | 50 | ||||
-rw-r--r-- | src/mem3/src/mem3_httpd.erl | 66 | ||||
-rw-r--r-- | src/mem3/src/mem3_httpd_handlers.erl | 23 | ||||
-rw-r--r-- | src/mem3/src/mem3_nodes.erl | 146 | ||||
-rw-r--r-- | src/mem3/src/mem3_rep.erl | 481 | ||||
-rw-r--r-- | src/mem3/src/mem3_rpc.erl | 586 | ||||
-rw-r--r-- | src/mem3/src/mem3_shards.erl | 419 | ||||
-rw-r--r-- | src/mem3/src/mem3_sup.erl | 35 | ||||
-rw-r--r-- | src/mem3/src/mem3_sync.erl | 319 | ||||
-rw-r--r-- | src/mem3/src/mem3_sync_event.erl | 86 | ||||
-rw-r--r-- | src/mem3/src/mem3_sync_event_listener.erl | 309 | ||||
-rw-r--r-- | src/mem3/src/mem3_sync_nodes.erl | 115 | ||||
-rw-r--r-- | src/mem3/src/mem3_sync_security.erl | 107 | ||||
-rw-r--r-- | src/mem3/src/mem3_util.erl | 253 | ||||
-rw-r--r-- | src/mem3/test/01-config-default.ini | 14 | ||||
-rw-r--r-- | src/mem3/test/mem3_util_test.erl | 167 |
23 files changed, 3867 insertions, 0 deletions
diff --git a/src/mem3/LICENSE b/src/mem3/LICENSE new file mode 100644 index 000000000..f6cd2bc80 --- /dev/null +++ b/src/mem3/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/src/mem3/README.md b/src/mem3/README.md new file mode 100644 index 000000000..1e1e0bd2c --- /dev/null +++ b/src/mem3/README.md @@ -0,0 +1,43 @@ +## mem3 + +Mem3 is the node membership application for clustered [CouchDB][1]. It is used +in CouchDB since version 2.0 and tracks two very important things for the +cluster: + + 1. member nodes + 2. node/shards mappings for each database + +Both the nodes and shards are tracked in node-local couch databases. Shards +are heavily used, so an ETS cache is also maintained for low-latency lookups. +The nodes and shards are synchronized via continuous CouchDB replication, +which serves as 'gossip' in Dynamo parlance. The shards ETS cache is kept in +sync based on membership and database event listeners. + +A very important point to make here is that CouchDB does not necessarily +divide up each database into equal shards across the nodes of a cluster. For +instance, in a 20-node cluster, you may have the need to create a small +database with very few documents. For efficiency reasons, you may create your +database with Q=4 and keep the default of N=3. This means you only have 12 +shards total, so 8 nodes will hold none of the data for this database. Given +this feature, we even shard use out across the cluster by altering the 'start' +node for the database's shards. + +Splitting and merging shards is an immature feature of the system, and will +require attention in the near-term. We believe we can implement both +functions and perform them while the database remains online. + +### Getting Started + +Mem3 requires R13B03 or higher and can be built with [rebar][2], which comes +bundled in the repository. Rebar needs to be able to find the `couch_db.hrl` +header file; one way to accomplish this is to set ERL_LIBS to point to the +apps subdirectory of a CouchDB checkout, e.g. + + ERL_LIBS="/usr/local/src/couchdb/apps" ./rebar compile + +### License +[Apache 2.0][3] + +[1]: http://couchdb.apache.org +[2]: http://github.com/rebar/rebar +[3]: http://www.apache.org/licenses/LICENSE-2.0.html diff --git a/src/mem3/include/mem3.hrl b/src/mem3/include/mem3.hrl new file mode 100644 index 000000000..d6ac0bed2 --- /dev/null +++ b/src/mem3/include/mem3.hrl @@ -0,0 +1,52 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +% type specification hacked to suppress dialyzer warning re: match spec +-record(shard, { + name :: binary() | '_', + node :: node() | '_', + dbname :: binary(), + range :: [non_neg_integer() | '$1' | '$2'] | '_', + ref :: reference() | 'undefined' | '_' +}). + +%% Do not reference outside of mem3. +-record(ordered_shard, { + name :: binary() | '_', + node :: node() | '_', + dbname :: binary(), + range :: [non_neg_integer() | '$1' | '$2'] | '_', + ref :: reference() | 'undefined' | '_', + order :: non_neg_integer() | 'undefined' | '_' +}). + +%% types +-type join_type() :: init | join | replace | leave. +-type join_order() :: non_neg_integer(). +-type options() :: list(). +-type mem_node() :: {join_order(), node(), options()}. +-type mem_node_list() :: [mem_node()]. +-type arg_options() :: {test, boolean()}. +-type args() :: [] | [arg_options()]. +-type test() :: undefined | node(). +-type epoch() :: float(). +-type clock() :: {node(), epoch()}. +-type vector_clock() :: [clock()]. +-type ping_node() :: node() | nil. +-type gossip_fun() :: call | cast. + +-type part() :: #shard{}. +-type fullmap() :: [part()]. +-type ref_part_map() :: {reference(), part()}. +-type tref() :: reference(). +-type np() :: {node(), part()}. +-type beg_acc() :: [integer()]. diff --git a/src/mem3/priv/stats_descriptions.cfg b/src/mem3/priv/stats_descriptions.cfg new file mode 100644 index 000000000..569d16ac3 --- /dev/null +++ b/src/mem3/priv/stats_descriptions.cfg @@ -0,0 +1,12 @@ +{[mem3, shard_cache, eviction], [ + {type, counter}, + {desc, <<"number of shard cache evictions">>} +]}. +{[mem3, shard_cache, hit], [ + {type, counter}, + {desc, <<"number of shard cache hits">>} +]}. +{[mem3, shard_cache, miss], [ + {type, counter}, + {desc, <<"number of shard cache misses">>} +]}. diff --git a/src/mem3/src/mem3.app.src b/src/mem3/src/mem3.app.src new file mode 100644 index 000000000..99a9eed88 --- /dev/null +++ b/src/mem3/src/mem3.app.src @@ -0,0 +1,53 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +{application, mem3, [ + {description, "CouchDB Cluster Membership"}, + {vsn, git}, + {modules, [ + mem3, + mem3_app, + mem3_httpd, + mem3_nodes, + mem3_rep, + mem3_shards, + mem3_sup, + mem3_sync, + mem3_sync_event, + mem3_sync_nodes, + mem3_sync_security, + mem3_util + ]}, + {mod, {mem3_app, []}}, + {registered, [ + mem3_events, + mem3_nodes, + mem3_shards, + mem3_sync, + mem3_sync_nodes, + mem3_sup + ]}, + {applications, [ + kernel, + stdlib, + config, + sasl, + crypto, + mochiweb, + couch_epi, + couch, + rexi, + couch_log, + couch_event, + couch_stats + ]} +]}. diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl new file mode 100644 index 000000000..405d7e5fa --- /dev/null +++ b/src/mem3/src/mem3.erl @@ -0,0 +1,308 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3). + +-export([start/0, stop/0, restart/0, nodes/0, node_info/2, shards/1, shards/2, + choose_shards/2, n/1, n/2, dbname/1, ushards/1]). +-export([get_shard/3, local_shards/1, shard_suffix/1, fold_shards/2]). +-export([sync_security/0, sync_security/1]). +-export([compare_nodelists/0, compare_shards/1]). +-export([quorum/1, group_by_proximity/1]). +-export([live_shards/2]). +-export([belongs/2]). +-export([get_placement/1]). + +%% For mem3 use only. +-export([name/1, node/1, range/1]). + +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +start() -> + application:start(mem3). + +stop() -> + application:stop(mem3). + +restart() -> + stop(), + start(). + +%% @doc Detailed report of cluster-wide membership state. Queries the state +%% on all member nodes and builds a dictionary with unique states as the +%% key and the nodes holding that state as the value. Also reports member +%% nodes which fail to respond and nodes which are connected but are not +%% cluster members. Useful for debugging. +-spec compare_nodelists() -> [{{cluster_nodes, [node()]} | bad_nodes + | non_member_nodes, [node()]}]. +compare_nodelists() -> + Nodes = mem3:nodes(), + AllNodes = erlang:nodes([this, visible]), + {Replies, BadNodes} = gen_server:multi_call(Nodes, mem3_nodes, get_nodelist), + Dict = lists:foldl(fun({Node, Nodelist}, D) -> + orddict:append({cluster_nodes, Nodelist}, Node, D) + end, orddict:new(), Replies), + [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict]. + +-spec compare_shards(DbName::iodata()) -> [{bad_nodes | [#shard{}], [node()]}]. +compare_shards(DbName) when is_list(DbName) -> + compare_shards(list_to_binary(DbName)); +compare_shards(DbName) -> + Nodes = mem3:nodes(), + {Replies, BadNodes} = rpc:multicall(mem3, shards, [DbName]), + GoodNodes = [N || N <- Nodes, not lists:member(N, BadNodes)], + Dict = lists:foldl(fun({Shards, Node}, D) -> + orddict:append(Shards, Node, D) + end, orddict:new(), lists:zip(Replies, GoodNodes)), + [{bad_nodes, BadNodes} | Dict]. + +-spec n(DbName::iodata()) -> integer(). +n(DbName) -> + n(DbName, <<"foo">>). + +n(DbName, DocId) -> + length(mem3:shards(DbName, DocId)). + +-spec nodes() -> [node()]. +nodes() -> + mem3_nodes:get_nodelist(). + +node_info(Node, Key) -> + mem3_nodes:get_node_info(Node, Key). + +-spec shards(DbName::iodata()) -> [#shard{}]. +shards(DbName) -> + shards_int(DbName, []). + +shards_int(DbName, Options) when is_list(DbName) -> + shards_int(list_to_binary(DbName), Options); +shards_int(DbName, Options) -> + Ordered = lists:member(ordered, Options), + ShardDbName = + list_to_binary(config:get("mem3", "shards_db", "_dbs")), + case DbName of + ShardDbName when Ordered -> + %% shard_db is treated as a single sharded db to support calls to db_info + %% and view_all_docs + [#ordered_shard{ + node = node(), + name = ShardDbName, + dbname = ShardDbName, + range = [0, (2 bsl 31)-1], + order = undefined}]; + ShardDbName -> + %% shard_db is treated as a single sharded db to support calls to db_info + %% and view_all_docs + [#shard{ + node = node(), + name = ShardDbName, + dbname = ShardDbName, + range = [0, (2 bsl 31)-1]}]; + _ -> + mem3_shards:for_db(DbName, Options) + end. + +-spec shards(DbName::iodata(), DocId::binary()) -> [#shard{}]. +shards(DbName, DocId) -> + shards_int(DbName, DocId, []). + +shards_int(DbName, DocId, Options) when is_list(DbName) -> + shards_int(list_to_binary(DbName), DocId, Options); +shards_int(DbName, DocId, Options) when is_list(DocId) -> + shards_int(DbName, list_to_binary(DocId), Options); +shards_int(DbName, DocId, Options) -> + mem3_shards:for_docid(DbName, DocId, Options). + + +-spec ushards(DbName::iodata()) -> [#shard{}]. +ushards(DbName) -> + Nodes = [node()|erlang:nodes()], + ZoneMap = zone_map(Nodes), + Shards = ushards(DbName, live_shards(DbName, Nodes, [ordered]), ZoneMap), + mem3_util:downcast(Shards). + +ushards(DbName, Shards0, ZoneMap) -> + {L,S,D} = group_by_proximity(Shards0, ZoneMap), + % Prefer shards in the local zone over shards in a different zone, + % but sort each zone separately to ensure a consistent choice between + % nodes in the same zone. + Shards = choose_ushards(DbName, L ++ S) ++ choose_ushards(DbName, D), + lists:ukeysort(#shard.range, Shards). + +get_shard(DbName, Node, Range) -> + mem3_shards:get(DbName, Node, Range). + +local_shards(DbName) -> + mem3_shards:local(DbName). + +shard_suffix(#db{name=DbName}) -> + shard_suffix(DbName); +shard_suffix(DbName0) -> + Shard = hd(shards(DbName0)), + <<"shards/", _:8/binary, "-", _:8/binary, "/", DbName/binary>> = + Shard#shard.name, + filename:extension(binary_to_list(DbName)). + +fold_shards(Fun, Acc) -> + mem3_shards:fold(Fun, Acc). + +sync_security() -> + mem3_sync_security:go(). + +sync_security(Db) -> + mem3_sync_security:go(dbname(Db)). + +-spec choose_shards(DbName::iodata(), Options::list()) -> [#shard{}]. +choose_shards(DbName, Options) when is_list(DbName) -> + choose_shards(list_to_binary(DbName), Options); +choose_shards(DbName, Options) -> + try shards(DbName) + catch error:E when E==database_does_not_exist; E==badarg -> + Nodes = allowed_nodes(), + case get_placement(Options) of + undefined -> + choose_shards(DbName, Nodes, Options); + Placement -> + lists:flatmap(fun({Zone, N}) -> + NodesInZone = nodes_in_zone(Nodes, Zone), + Options1 = lists:keymerge(1, [{n,N}], Options), + choose_shards(DbName, NodesInZone, Options1) + end, Placement) + end + end. + +choose_shards(DbName, Nodes, Options) -> + NodeCount = length(Nodes), + Suffix = couch_util:get_value(shard_suffix, Options, ""), + N = mem3_util:n_val(couch_util:get_value(n, Options), NodeCount), + if N =:= 0 -> erlang:error(no_nodes_in_zone); + true -> ok + end, + Q = mem3_util:to_integer(couch_util:get_value(q, Options, + config:get("cluster", "q", "8"))), + %% rotate to a random entry in the nodelist for even distribution + {A, B} = lists:split(crypto:rand_uniform(1,length(Nodes)+1), Nodes), + RotatedNodes = B ++ A, + mem3_util:create_partition_map(DbName, N, Q, RotatedNodes, Suffix). + +get_placement(Options) -> + case couch_util:get_value(placement, Options) of + undefined -> + case config:get("cluster", "placement") of + undefined -> + undefined; + PlacementStr -> + decode_placement_string(PlacementStr) + end; + PlacementStr -> + decode_placement_string(PlacementStr) + end. + +decode_placement_string(PlacementStr) -> + [begin + [Zone, N] = string:tokens(Rule, ":"), + {list_to_binary(Zone), list_to_integer(N)} + end || Rule <- string:tokens(PlacementStr, ",")]. + +-spec dbname(#shard{} | iodata()) -> binary(). +dbname(#shard{dbname = DbName}) -> + DbName; +dbname(<<"shards/", _:8/binary, "-", _:8/binary, "/", DbName/binary>>) -> + list_to_binary(filename:rootname(binary_to_list(DbName))); +dbname(DbName) when is_list(DbName) -> + dbname(list_to_binary(DbName)); +dbname(DbName) when is_binary(DbName) -> + DbName; +dbname(_) -> + erlang:error(badarg). + +%% @doc Determine if DocId belongs in shard (identified by record or filename) +belongs(#shard{}=Shard, DocId) when is_binary(DocId) -> + [Begin, End] = range(Shard), + belongs(Begin, End, DocId); +belongs(<<"shards/", _/binary>> = ShardName, DocId) when is_binary(DocId) -> + [Begin, End] = range(ShardName), + belongs(Begin, End, DocId); +belongs(DbName, DocId) when is_binary(DbName), is_binary(DocId) -> + true. + +belongs(Begin, End, DocId) -> + HashKey = mem3_util:hash(DocId), + Begin =< HashKey andalso HashKey =< End. + +range(#shard{range = Range}) -> + Range; +range(#ordered_shard{range = Range}) -> + Range; +range(<<"shards/", Start:8/binary, "-", End:8/binary, "/", _/binary>>) -> + [httpd_util:hexlist_to_integer(binary_to_list(Start)), + httpd_util:hexlist_to_integer(binary_to_list(End))]. + +allowed_nodes() -> + [Node || Node <- mem3:nodes(), mem3:node_info(Node, <<"decom">>) =/= true]. + +nodes_in_zone(Nodes, Zone) -> + [Node || Node <- Nodes, Zone == mem3:node_info(Node, <<"zone">>)]. + +live_shards(DbName, Nodes) -> + live_shards(DbName, Nodes, []). + +live_shards(DbName, Nodes, Options) -> + [S || S <- shards_int(DbName, Options), lists:member(mem3:node(S), Nodes)]. + +zone_map(Nodes) -> + [{Node, node_info(Node, <<"zone">>)} || Node <- Nodes]. + +group_by_proximity(Shards) -> + Nodes = [mem3:node(S) || S <- lists:ukeysort(#shard.node, Shards)], + group_by_proximity(Shards, zone_map(Nodes)). + +group_by_proximity(Shards, ZoneMap) -> + {Local, Remote} = lists:partition(fun(S) -> mem3:node(S) =:= node() end, + Shards), + LocalZone = proplists:get_value(node(), ZoneMap), + Fun = fun(S) -> proplists:get_value(mem3:node(S), ZoneMap) =:= LocalZone end, + {SameZone, DifferentZone} = lists:partition(Fun, Remote), + {Local, SameZone, DifferentZone}. + +choose_ushards(DbName, Shards) -> + Groups0 = group_by_range(Shards), + Groups1 = [mem3_util:rotate_list({DbName, R}, order_shards(G)) + || {R, G} <- Groups0], + [hd(G) || G <- Groups1]. + +order_shards([#ordered_shard{}|_]=OrderedShards) -> + lists:keysort(#ordered_shard.order, OrderedShards); +order_shards(UnorderedShards) -> + UnorderedShards. + +group_by_range(Shards) -> + lists:foldl(fun(Shard, Dict) -> + orddict:append(mem3:range(Shard), Shard, Dict) end, orddict:new(), Shards). + +% quorum functions + +quorum(#db{name=DbName}) -> + quorum(DbName); +quorum(DbName) -> + n(DbName) div 2 + 1. + +node(#shard{node=Node}) -> + Node; +node(#ordered_shard{node=Node}) -> + Node. + +name(#shard{name=Name}) -> + Name; +name(#ordered_shard{name=Name}) -> + Name. diff --git a/src/mem3/src/mem3_app.erl b/src/mem3/src/mem3_app.erl new file mode 100644 index 000000000..3ddfbe6fd --- /dev/null +++ b/src/mem3/src/mem3_app.erl @@ -0,0 +1,21 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_app). +-behaviour(application). +-export([start/2, stop/1]). + +start(_Type, []) -> + mem3_sup:start_link(). + +stop([]) -> + ok. diff --git a/src/mem3/src/mem3_epi.erl b/src/mem3/src/mem3_epi.erl new file mode 100644 index 000000000..ebcd596b6 --- /dev/null +++ b/src/mem3/src/mem3_epi.erl @@ -0,0 +1,50 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +-module(mem3_epi). + +-behaviour(couch_epi_plugin). + +-export([ + app/0, + providers/0, + services/0, + data_subscriptions/0, + data_providers/0, + processes/0, + notify/3 +]). + +app() -> + mem3. + +providers() -> + [ + {chttpd_handlers, mem3_httpd_handlers} + ]. + + +services() -> + []. + +data_subscriptions() -> + []. + +data_providers() -> + []. + +processes() -> + []. + +notify(_Key, _Old, _New) -> + ok. diff --git a/src/mem3/src/mem3_httpd.erl b/src/mem3/src/mem3_httpd.erl new file mode 100644 index 000000000..535815862 --- /dev/null +++ b/src/mem3/src/mem3_httpd.erl @@ -0,0 +1,66 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_httpd). + +-export([handle_membership_req/1, handle_shards_req/2]). + +%% includes +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + + +handle_membership_req(#httpd{method='GET', + path_parts=[<<"_membership">>]} = Req) -> + ClusterNodes = try mem3:nodes() + catch _:_ -> {ok,[]} end, + couch_httpd:send_json(Req, {[ + {all_nodes, lists:sort([node()|nodes()])}, + {cluster_nodes, lists:sort(ClusterNodes)} + ]}); +handle_membership_req(#httpd{path_parts=[<<"_membership">>]}=Req) -> + chttpd:send_method_not_allowed(Req, "GET"). + +handle_shards_req(#httpd{method='GET', + path_parts=[_DbName, <<"_shards">>]} = Req, Db) -> + DbName = mem3:dbname(Db#db.name), + Shards = mem3:shards(DbName), + JsonShards = json_shards(Shards, dict:new()), + couch_httpd:send_json(Req, {[ + {shards, JsonShards} + ]}); +handle_shards_req(#httpd{method='GET', + path_parts=[_DbName, <<"_shards">>, DocId]} = Req, Db) -> + DbName = mem3:dbname(Db#db.name), + Shards = mem3:shards(DbName, DocId), + {[{Shard, Dbs}]} = json_shards(Shards, dict:new()), + couch_httpd:send_json(Req, {[ + {range, Shard}, + {nodes, Dbs} + ]}); +handle_shards_req(#httpd{path_parts=[_DbName, <<"_shards">>]}=Req, _Db) -> + chttpd:send_method_not_allowed(Req, "GET"); +handle_shards_req(#httpd{path_parts=[_DbName, <<"_shards">>, _DocId]}=Req, _Db) -> + chttpd:send_method_not_allowed(Req, "GET"). + +%% +%% internal +%% + +json_shards([], AccIn) -> + List = dict:to_list(AccIn), + {lists:sort(List)}; +json_shards([#shard{node=Node, range=[B,E]} | Rest], AccIn) -> + HexBeg = couch_util:to_hex(<<B:32/integer>>), + HexEnd = couch_util:to_hex(<<E:32/integer>>), + Range = list_to_binary(HexBeg ++ "-" ++ HexEnd), + json_shards(Rest, dict:append(Range, Node, AccIn)). diff --git a/src/mem3/src/mem3_httpd_handlers.erl b/src/mem3/src/mem3_httpd_handlers.erl new file mode 100644 index 000000000..d8e138c15 --- /dev/null +++ b/src/mem3/src/mem3_httpd_handlers.erl @@ -0,0 +1,23 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_httpd_handlers). + +-export([url_handler/1, db_handler/1, design_handler/1]). + +url_handler(<<"_membership">>) -> fun mem3_httpd:handle_membership_req/1; +url_handler(_) -> no_match. + +db_handler(<<"_shards">>) -> fun mem3_httpd:handle_shards_req/2; +db_handler(_) -> no_match. + +design_handler(_) -> no_match. diff --git a/src/mem3/src/mem3_nodes.erl b/src/mem3/src/mem3_nodes.erl new file mode 100644 index 000000000..f31891a7b --- /dev/null +++ b/src/mem3/src/mem3_nodes.erl @@ -0,0 +1,146 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_nodes). +-behaviour(gen_server). +-vsn(1). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([start_link/0, get_nodelist/0, get_node_info/2]). + +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-record(state, {changes_pid, update_seq}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +get_nodelist() -> + try + lists:sort([N || {N,_} <- ets:tab2list(?MODULE)]) + catch error:badarg -> + gen_server:call(?MODULE, get_nodelist) + end. + +get_node_info(Node, Key) -> + try + couch_util:get_value(Key, ets:lookup_element(?MODULE, Node, 2)) + catch error:badarg -> + gen_server:call(?MODULE, {get_node_info, Node, Key}) + end. + +init([]) -> + ets:new(?MODULE, [named_table, {read_concurrency, true}]), + UpdateSeq = initialize_nodelist(), + {Pid, _} = spawn_monitor(fun() -> listen_for_changes(UpdateSeq) end), + {ok, #state{changes_pid = Pid, update_seq = UpdateSeq}}. + +handle_call(get_nodelist, _From, State) -> + {reply, lists:sort([N || {N,_} <- ets:tab2list(?MODULE)]), State}; +handle_call({get_node_info, Node, Key}, _From, State) -> + Resp = try + couch_util:get_value(Key, ets:lookup_element(?MODULE, Node, 2)) + catch error:badarg -> + error + end, + {reply, Resp, State}; +handle_call({add_node, Node, NodeInfo}, _From, State) -> + gen_event:notify(mem3_events, {add_node, Node}), + ets:insert(?MODULE, {Node, NodeInfo}), + {reply, ok, State}; +handle_call({remove_node, Node}, _From, State) -> + gen_event:notify(mem3_events, {remove_node, Node}), + ets:delete(?MODULE, Node), + {reply, ok, State}; +handle_call(_Call, _From, State) -> + {noreply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) -> + couch_log:notice("~p changes listener died ~p", [?MODULE, Reason]), + StartSeq = State#state.update_seq, + Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> StartSeq end, + erlang:send_after(5000, self(), start_listener), + {noreply, State#state{update_seq = Seq}}; +handle_info(start_listener, #state{update_seq = Seq} = State) -> + {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end), + {noreply, State#state{changes_pid=NewPid}}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, #state{}=State, _Extra) -> + {ok, State}. + +%% internal functions + +initialize_nodelist() -> + DbName = config:get("mem3", "nodes_db", "_nodes"), + {ok, Db} = mem3_util:ensure_exists(DbName), + {ok, _, Db} = couch_btree:fold(Db#db.id_tree, fun first_fold/3, Db, []), + % add self if not already present + case ets:lookup(?MODULE, node()) of + [_] -> + ok; + [] -> + ets:insert(?MODULE, {node(), []}), + Doc = #doc{id = couch_util:to_binary(node())}, + {ok, _} = couch_db:update_doc(Db, Doc, []) + end, + couch_db:close(Db), + Db#db.update_seq. + +first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, _, Acc) -> + {ok, Acc}; +first_fold(#full_doc_info{deleted=true}, _, Acc) -> + {ok, Acc}; +first_fold(#full_doc_info{id=Id}=DocInfo, _, Db) -> + {ok, #doc{body={Props}}} = couch_db:open_doc(Db, DocInfo, [ejson_body]), + ets:insert(?MODULE, {mem3_util:to_atom(Id), Props}), + {ok, Db}. + +listen_for_changes(Since) -> + DbName = config:get("mem3", "nodes_db", "_nodes"), + {ok, Db} = mem3_util:ensure_exists(DbName), + Args = #changes_args{ + feed = "continuous", + since = Since, + heartbeat = true, + include_docs = true + }, + ChangesFun = couch_changes:handle_db_changes(Args, nil, Db), + ChangesFun(fun changes_callback/2). + +changes_callback(start, _) -> + {ok, nil}; +changes_callback({stop, EndSeq}, _) -> + exit({seq, EndSeq}); +changes_callback({change, {Change}, _}, _) -> + Node = couch_util:get_value(<<"id">>, Change), + case Node of <<"_design/", _/binary>> -> ok; _ -> + case mem3_util:is_deleted(Change) of + false -> + {Props} = couch_util:get_value(doc, Change), + gen_server:call(?MODULE, {add_node, mem3_util:to_atom(Node), Props}); + true -> + gen_server:call(?MODULE, {remove_node, mem3_util:to_atom(Node)}) + end + end, + {ok, couch_util:get_value(<<"seq">>, Change)}; +changes_callback(timeout, _) -> + {ok, nil}. diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl new file mode 100644 index 000000000..ad7ac55f5 --- /dev/null +++ b/src/mem3/src/mem3_rep.erl @@ -0,0 +1,481 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_rep). + + +-export([ + go/2, + go/3, + make_local_id/2, + find_source_seq/4 +]). + +-export([ + changes_enumerator/3 +]). + + +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-record(acc, { + batch_size, + batch_count, + revcount = 0, + infos = [], + seq = 0, + localid, + source, + target, + filter, + db, + history = {[]} +}). + + +go(Source, Target) -> + go(Source, Target, []). + + +go(DbName, Node, Opts) when is_binary(DbName), is_atom(Node) -> + go(#shard{name=DbName, node=node()}, #shard{name=DbName, node=Node}, Opts); + + +go(#shard{} = Source, #shard{} = Target, Opts) -> + mem3_sync_security:maybe_sync(Source, Target), + BatchSize = case proplists:get_value(batch_size, Opts) of + BS when is_integer(BS), BS > 0 -> BS; + _ -> 100 + end, + BatchCount = case proplists:get_value(batch_count, Opts) of + all -> all; + BC when is_integer(BC), BC > 0 -> BC; + _ -> 1 + end, + Filter = proplists:get_value(filter, Opts), + Acc = #acc{ + batch_size = BatchSize, + batch_count = BatchCount, + source = Source, + target = Target, + filter = Filter + }, + go(Acc). + + +go(#acc{source=Source, batch_count=BC}=Acc0) -> + case couch_db:open(Source#shard.name, [?ADMIN_CTX]) of + {ok, Db} -> + Acc = Acc0#acc{db=Db}, + Resp = try + repl(Db, Acc) + catch error:{not_found, no_db_file} -> + {error, missing_target} + after + couch_db:close(Db) + end, + case Resp of + {ok, P} when P > 0, BC == all -> + go(Acc); + {ok, P} when P > 0, BC > 1 -> + go(Acc#acc{batch_count=BC-1}); + Else -> + Else + end; + {not_found, no_db_file} -> + {error, missing_source} + end. + + +make_local_id(Source, Target) -> + make_local_id(Source, Target, undefined). + + +make_local_id(#shard{node=SourceNode}, #shard{node=TargetNode}, Filter) -> + make_local_id(SourceNode, TargetNode, Filter); + + +make_local_id(SourceThing, TargetThing, Filter) -> + S = couch_util:encodeBase64Url(couch_crypto:hash(md5, term_to_binary(SourceThing))), + T = couch_util:encodeBase64Url(couch_crypto:hash(md5, term_to_binary(TargetThing))), + F = case is_function(Filter) of + true -> + {new_uniq, Hash} = erlang:fun_info(Filter, new_uniq), + B = couch_util:encodeBase64Url(Hash), + <<"-", B/binary>>; + false -> + <<>> + end, + <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>. + + +%% @doc Find and return the largest update_seq in SourceDb +%% that the client has seen from TargetNode. +%% +%% When reasoning about this function it is very important to +%% understand the direction of replication for this comparison. +%% We're only interesting in internal replications initiated +%% by this node to the node being replaced. When doing a +%% replacement the most important thing is that the client doesn't +%% miss any updates. This means we can only fast-forward as far +%% as they've seen updates on this node. We can detect that by +%% looking for our push replication history and choosing the +%% largest source_seq that has a target_seq =< TgtSeq. +find_source_seq(SrcDb, TgtNode, TgtUUIDPrefix, TgtSeq) -> + case find_repl_doc(SrcDb, TgtUUIDPrefix) of + {ok, TgtUUID, Doc} -> + SrcNode = atom_to_binary(node(), utf8), + find_source_seq_int(Doc, SrcNode, TgtNode, TgtUUID, TgtSeq); + {not_found, _} -> + 0 + end. + + +find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) -> + SrcNode = case is_atom(SrcNode0) of + true -> atom_to_binary(SrcNode0, utf8); + false -> SrcNode0 + end, + TgtNode = case is_atom(TgtNode0) of + true -> atom_to_binary(TgtNode0, utf8); + false -> TgtNode0 + end, + % This is split off purely for the ability to run unit tests + % against this bit of code without requiring all sorts of mocks. + {History} = couch_util:get_value(<<"history">>, Props, {[]}), + SrcHistory = couch_util:get_value(SrcNode, History, []), + UseableHistory = lists:filter(fun({Entry}) -> + couch_util:get_value(<<"target_node">>, Entry) =:= TgtNode andalso + couch_util:get_value(<<"target_uuid">>, Entry) =:= TgtUUID andalso + couch_util:get_value(<<"target_seq">>, Entry) =< TgtSeq + end, SrcHistory), + + % This relies on SrcHistory being ordered desceding by source + % sequence. + case UseableHistory of + [{Entry} | _] -> + couch_util:get_value(<<"source_seq">>, Entry); + [] -> + 0 + end. + + +repl(#db{name=DbName, seq_tree=Bt}=Db, Acc0) -> + erlang:put(io_priority, {internal_repl, DbName}), + #acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0#acc{source = Db}), + Fun = fun ?MODULE:changes_enumerator/3, + {ok, _, Acc2} = couch_btree:fold(Bt, Fun, Acc1, [{start_key, Seq + 1}]), + {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2), + {ok, couch_db:count_changes_since(Db, LastSeq)}. + + +calculate_start_seq(Acc) -> + #acc{ + source = Db, + target = #shard{node=Node, name=Name} + } = Acc, + %% Give the target our UUID and ask it to return the checkpoint doc + UUID = couch_db:get_uuid(Db), + {NewDocId, Doc} = mem3_rpc:load_checkpoint(Node, Name, node(), UUID), + #doc{id=FoundId, body={TProps}} = Doc, + Acc1 = Acc#acc{localid = NewDocId}, + % NewDocId and FoundId may be different the first time + % this code runs to save our newly named internal replication + % checkpoints. We store NewDocId to use when saving checkpoints + % but use FoundId to reuse the same docid that the target used. + case couch_db:open_doc(Db, FoundId, [ejson_body]) of + {ok, #doc{body = {SProps}}} -> + SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0), + TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0), + % We resume from the lower update seq stored in the two + % shard copies. We also need to be sure and use the + % corresponding history. A difference here could result + % from either a write failure on one of the nodes or if + % either shard was truncated by an operator. + case SourceSeq =< TargetSeq of + true -> + Seq = SourceSeq, + History = couch_util:get_value(<<"history">>, SProps, {[]}); + false -> + Seq = TargetSeq, + History = couch_util:get_value(<<"history">>, TProps, {[]}) + end, + Acc1#acc{seq = Seq, history = History}; + {not_found, _} -> + compare_epochs(Acc1) + end. + +compare_epochs(Acc) -> + #acc{ + source = Db, + target = #shard{node=Node, name=Name} + } = Acc, + UUID = couch_db:get_uuid(Db), + Epochs = couch_db:get_epochs(Db), + Seq = mem3_rpc:find_common_seq(Node, Name, UUID, Epochs), + Acc#acc{seq = Seq, history = {[]}}. + +changes_enumerator(#doc_info{id=DocId}, Reds, #acc{db=Db}=Acc) -> + {ok, FDI} = couch_db:get_full_doc_info(Db, DocId), + changes_enumerator(FDI, Reds, Acc); +changes_enumerator(#full_doc_info{}=FDI, _, + #acc{revcount=C, infos=Infos}=Acc0) -> + #doc_info{ + high_seq=Seq, + revs=Revs + } = couch_doc:to_doc_info(FDI), + {Count, NewInfos} = case filter_doc(Acc0#acc.filter, FDI) of + keep -> {C + length(Revs), [FDI | Infos]}; + discard -> {C, Infos} + end, + Acc1 = Acc0#acc{ + seq=Seq, + revcount=Count, + infos=NewInfos + }, + Go = if Count < Acc1#acc.batch_size -> ok; true -> stop end, + {Go, Acc1}. + + +replicate_batch(#acc{target = #shard{node=Node, name=Name}} = Acc) -> + case find_missing_revs(Acc) of + [] -> + ok; + Missing -> + lists:map(fun(Chunk) -> + Docs = open_docs(Acc, Chunk), + ok = save_on_target(Node, Name, Docs) + end, chunk_revs(Missing)) + end, + update_locals(Acc), + {ok, Acc#acc{revcount=0, infos=[]}}. + + +find_missing_revs(Acc) -> + #acc{target = #shard{node=Node, name=Name}, infos = Infos} = Acc, + IdsRevs = lists:map(fun(FDI) -> + #doc_info{id=Id, revs=RevInfos} = couch_doc:to_doc_info(FDI), + {Id, [R || #rev_info{rev=R} <- RevInfos]} + end, Infos), + mem3_rpc:get_missing_revs(Node, Name, IdsRevs, [ + {io_priority, {internal_repl, Name}}, + ?ADMIN_CTX + ]). + + +chunk_revs(Revs) -> + Limit = list_to_integer(config:get("mem3", "rev_chunk_size", "5000")), + chunk_revs(Revs, Limit). + +chunk_revs(Revs, Limit) -> + chunk_revs(Revs, {0, []}, [], Limit). + +chunk_revs([], {_Count, Chunk}, Chunks, _Limit) -> + [Chunk|Chunks]; +chunk_revs([{Id, R, A}|Revs], {Count, Chunk}, Chunks, Limit) when length(R) =< Limit - Count -> + chunk_revs( + Revs, + {Count + length(R), [{Id, R, A}|Chunk]}, + Chunks, + Limit + ); +chunk_revs([{Id, R, A}|Revs], {Count, Chunk}, Chunks, Limit) -> + {This, Next} = lists:split(Limit - Count, R), + chunk_revs( + [{Id, Next, A}|Revs], + {0, []}, + [[{Id, This, A}|Chunk]|Chunks], + Limit + ). + + +open_docs(#acc{source=Source, infos=Infos}, Missing) -> + lists:flatmap(fun({Id, Revs, _}) -> + FDI = lists:keyfind(Id, #full_doc_info.id, Infos), + #full_doc_info{rev_tree=RevTree} = FDI, + {FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs), + lists:map(fun({#leaf{deleted=IsDel, ptr=SummaryPtr}, FoundRevPath}) -> + couch_db:make_doc(Source, Id, IsDel, SummaryPtr, FoundRevPath) + end, FoundRevs) + end, Missing). + + +save_on_target(Node, Name, Docs) -> + mem3_rpc:update_docs(Node, Name, Docs, [ + replicated_changes, + full_commit, + ?ADMIN_CTX, + {io_priority, {internal_repl, Name}} + ]), + ok. + + +update_locals(Acc) -> + #acc{seq=Seq, source=Db, target=Target, localid=Id, history=History} = Acc, + #shard{name=Name, node=Node} = Target, + NewEntry = [ + {<<"source_node">>, atom_to_binary(node(), utf8)}, + {<<"source_uuid">>, couch_db:get_uuid(Db)}, + {<<"source_seq">>, Seq}, + {<<"timestamp">>, list_to_binary(iso8601_timestamp())} + ], + NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, NewEntry, History), + {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []). + + +find_repl_doc(SrcDb, TgtUUIDPrefix) -> + SrcUUID = couch_db:get_uuid(SrcDb), + S = couch_util:encodeBase64Url(couch_crypto:hash(md5, term_to_binary(SrcUUID))), + DocIdPrefix = <<"_local/shard-sync-", S/binary, "-">>, + FoldFun = fun({DocId, {Rev0, {BodyProps}}}, _, _) -> + TgtUUID = couch_util:get_value(<<"target_uuid">>, BodyProps, <<>>), + case is_prefix(DocIdPrefix, DocId) of + true -> + case is_prefix(TgtUUIDPrefix, TgtUUID) of + true -> + Rev = list_to_binary(integer_to_list(Rev0)), + Doc = #doc{id=DocId, revs={0, [Rev]}, body={BodyProps}}, + {stop, {TgtUUID, Doc}}; + false -> + {ok, not_found} + end; + _ -> + {stop, not_found} + end + end, + Options = [{start_key, DocIdPrefix}], + case couch_btree:fold(SrcDb#db.local_tree, FoldFun, not_found, Options) of + {ok, _, {TgtUUID, Doc}} -> + {ok, TgtUUID, Doc}; + {ok, _, not_found} -> + {not_found, missing}; + Else -> + couch_log:error("Error finding replication doc: ~w", [Else]), + {not_found, missing} + end. + + +is_prefix(Prefix, Subject) -> + binary:longest_common_prefix([Prefix, Subject]) == size(Prefix). + + +filter_doc(Filter, FullDocInfo) when is_function(Filter) -> + try Filter(FullDocInfo) of + discard -> discard; + _ -> keep + catch _:_ -> + keep + end; +filter_doc(_, _) -> + keep. + + +iso8601_timestamp() -> + {_,_,Micro} = Now = os:timestamp(), + {{Year,Month,Date},{Hour,Minute,Second}} = calendar:now_to_datetime(Now), + Format = "~4.10.0B-~2.10.0B-~2.10.0BT~2.10.0B:~2.10.0B:~2.10.0B.~6.10.0BZ", + io_lib:format(Format, [Year, Month, Date, Hour, Minute, Second, Micro]). + + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + + +find_source_seq_unknown_node_test() -> + ?assertEqual( + find_source_seq_int(doc_(), <<"foo">>, <<"bing">>, <<"bar_uuid">>, 10), + 0 + ). + + +find_source_seq_unknown_uuid_test() -> + ?assertEqual( + find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"teapot">>, 10), + 0 + ). + + +find_source_seq_ok_test() -> + ?assertEqual( + find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"bar_uuid">>, 100), + 100 + ). + + +find_source_seq_old_ok_test() -> + ?assertEqual( + find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"bar_uuid">>, 84), + 50 + ). + + +find_source_seq_different_node_test() -> + ?assertEqual( + find_source_seq_int(doc_(), <<"foo2">>, <<"bar">>, <<"bar_uuid">>, 92), + 31 + ). + + +-define(SNODE, <<"source_node">>). +-define(SUUID, <<"source_uuid">>). +-define(SSEQ, <<"source_seq">>). +-define(TNODE, <<"target_node">>). +-define(TUUID, <<"target_uuid">>). +-define(TSEQ, <<"target_seq">>). + +doc_() -> + Foo_Bar = [ + {[ + {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 100}, + {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 100} + ]}, + {[ + {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 90}, + {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 85} + ]}, + {[ + {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 50}, + {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 51} + ]}, + {[ + {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 40}, + {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 45} + ]}, + {[ + {?SNODE, <<"foo">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 2}, + {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 2} + ]} + ], + Foo2_Bar = [ + {[ + {?SNODE, <<"foo2">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 100}, + {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 100} + ]}, + {[ + {?SNODE, <<"foo2">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 92}, + {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 93} + ]}, + {[ + {?SNODE, <<"foo2">>}, {?SUUID, <<"foo_uuid">>}, {?SSEQ, 31}, + {?TNODE, <<"bar">>}, {?TUUID, <<"bar_uuid">>}, {?TSEQ, 30} + ]} + ], + History = {[ + {<<"foo">>, Foo_Bar}, + {<<"foo2">>, Foo2_Bar} + ]}, + #doc{ + body={[{<<"history">>, History}]} + }. + +-endif. diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl new file mode 100644 index 000000000..93cb99ac9 --- /dev/null +++ b/src/mem3/src/mem3_rpc.erl @@ -0,0 +1,586 @@ +% Copyright 2013 Cloudant +% +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_rpc). + + +-export([ + find_common_seq/4, + get_missing_revs/4, + update_docs/4, + load_checkpoint/4, + save_checkpoint/6 +]). + +% Private RPC callbacks +-export([ + find_common_seq_rpc/3, + load_checkpoint_rpc/3, + save_checkpoint_rpc/5 +]). + + +-include("mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + + +get_missing_revs(Node, DbName, IdsRevs, Options) -> + rexi_call(Node, {fabric_rpc, get_missing_revs, [DbName, IdsRevs, Options]}). + + +update_docs(Node, DbName, Docs, Options) -> + rexi_call(Node, {fabric_rpc, update_docs, [DbName, Docs, Options]}). + + +load_checkpoint(Node, DbName, SourceNode, SourceUUID) -> + Args = [DbName, SourceNode, SourceUUID], + rexi_call(Node, {mem3_rpc, load_checkpoint_rpc, Args}). + + +save_checkpoint(Node, DbName, DocId, Seq, Entry, History) -> + Args = [DbName, DocId, Seq, Entry, History], + rexi_call(Node, {mem3_rpc, save_checkpoint_rpc, Args}). + + +find_common_seq(Node, DbName, SourceUUID, SourceEpochs) -> + Args = [DbName, SourceUUID, SourceEpochs], + rexi_call(Node, {mem3_rpc, find_common_seq_rpc, Args}). + + +load_checkpoint_rpc(DbName, SourceNode, SourceUUID) -> + erlang:put(io_priority, {internal_repl, DbName}), + case get_or_create_db(DbName, [?ADMIN_CTX]) of + {ok, Db} -> + TargetUUID = couch_db:get_uuid(Db), + NewId = mem3_rep:make_local_id(SourceUUID, TargetUUID), + case couch_db:open_doc(Db, NewId, []) of + {ok, Doc} -> + rexi:reply({ok, {NewId, Doc}}); + {not_found, _} -> + OldId = mem3_rep:make_local_id(SourceNode, node()), + case couch_db:open_doc(Db, OldId, []) of + {ok, Doc} -> + rexi:reply({ok, {NewId, Doc}}); + {not_found, _} -> + rexi:reply({ok, {NewId, #doc{id = NewId}}}) + end + end; + Error -> + rexi:reply(Error) + end. + + +save_checkpoint_rpc(DbName, Id, SourceSeq, NewEntry0, History0) -> + erlang:put(io_priority, {internal_repl, DbName}), + case get_or_create_db(DbName, [?ADMIN_CTX]) of + {ok, #db{update_seq = TargetSeq} = Db} -> + NewEntry = {[ + {<<"target_node">>, atom_to_binary(node(), utf8)}, + {<<"target_uuid">>, couch_db:get_uuid(Db)}, + {<<"target_seq">>, TargetSeq} + ] ++ NewEntry0}, + Body = {[ + {<<"seq">>, SourceSeq}, + {<<"target_uuid">>, couch_db:get_uuid(Db)}, + {<<"history">>, add_checkpoint(NewEntry, History0)} + ]}, + Doc = #doc{id = Id, body = Body}, + rexi:reply(try couch_db:update_doc(Db, Doc, []) of + {ok, _} -> + {ok, Body}; + Else -> + {error, Else} + catch + Exception -> + Exception; + error:Reason -> + {error, Reason} + end); + Error -> + rexi:reply(Error) + end. + +find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) -> + erlang:put(io_priority, {internal_repl, DbName}), + case get_or_create_db(DbName, [?ADMIN_CTX]) of + {ok, Db} -> + case couch_db:get_uuid(Db) of + SourceUUID -> + TargetEpochs = couch_db:get_epochs(Db), + Seq = compare_epochs(SourceEpochs, TargetEpochs), + rexi:reply({ok, Seq}); + _Else -> + rexi:reply({ok, 0}) + end; + Error -> + rexi:reply(Error) + end. + + +%% @doc Return the sequence where two files with the same UUID diverged. +compare_epochs(SourceEpochs, TargetEpochs) -> + compare_rev_epochs( + lists:reverse(SourceEpochs), + lists:reverse(TargetEpochs) + ). + + +compare_rev_epochs([{Node, Seq} | SourceRest], [{Node, Seq} | TargetRest]) -> + % Common history, fast-forward + compare_epochs(SourceRest, TargetRest); +compare_rev_epochs([], [{_, TargetSeq} | _]) -> + % Source has not moved, start from seq just before the target took over + TargetSeq - 1; +compare_rev_epochs([{_, SourceSeq} | _], []) -> + % Target has not moved, start from seq where source diverged + SourceSeq; +compare_rev_epochs([{_, SourceSeq} | _], [{_, TargetSeq} | _]) -> + % The source was moved to a new location independently, take the minimum + erlang:min(SourceSeq, TargetSeq) - 1. + + +%% @doc This adds a new update sequence checkpoint to the replication +%% history. Checkpoints are keyed by the source node so that we +%% aren't mixing history between source shard moves. +add_checkpoint({Props}, {History}) -> + % Extract the source and target seqs for reference + SourceSeq = couch_util:get_value(<<"source_seq">>, Props), + TargetSeq = couch_util:get_value(<<"target_seq">>, Props), + + % Get the history relevant to the source node. + SourceNode = couch_util:get_value(<<"source_node">>, Props), + SourceHistory = couch_util:get_value(SourceNode, History, []), + + % If either the source or target shard has been truncated + % we need to filter out any history that was stored for + % any larger update seq than we're currently recording. + FilteredHistory = filter_history(SourceSeq, TargetSeq, SourceHistory), + + % Re-bucket our history based on the most recent source + % sequence. This is where we drop old checkpoints to + % maintain the exponential distribution. + {_, RebucketedHistory} = rebucket(FilteredHistory, SourceSeq, 0), + NewSourceHistory = [{Props} | RebucketedHistory], + + % Finally update the source node history and we're done. + NodeRemoved = lists:keydelete(SourceNode, 1, History), + {[{SourceNode, NewSourceHistory} | NodeRemoved]}. + + +filter_history(SourceSeqThresh, TargetSeqThresh, History) -> + SourceFilter = fun({Entry}) -> + SourceSeq = couch_util:get_value(<<"source_seq">>, Entry), + SourceSeq < SourceSeqThresh + end, + TargetFilter = fun({Entry}) -> + TargetSeq = couch_util:get_value(<<"target_seq">>, Entry), + TargetSeq < TargetSeqThresh + end, + SourceFiltered = lists:filter(SourceFilter, History), + lists:filter(TargetFilter, SourceFiltered). + + +%% @doc This function adjusts our history to maintain a +%% history of checkpoints that follow an exponentially +%% increasing age from the most recent checkpoint. +%% +%% The terms newest and oldest used in these comments +%% refers to the (NewSeq - CurSeq) difference where smaller +%% values are considered newer. +%% +%% It works by assigning each entry to a bucket and keeping +%% the newest and oldest entry in each bucket. Keeping +%% both the newest and oldest means that we won't end up +%% with empty buckets as checkpoints are promoted to new +%% buckets. +%% +%% The return value of this function is a two-tuple of the +%% form `{BucketId, History}` where BucketId is the id of +%% the bucket for the first entry in History. This is used +%% when recursing to detect the oldest value in a given +%% bucket. +%% +%% This function expects the provided history to be sorted +%% in descending order of source_seq values. +rebucket([], _NewSeq, Bucket) -> + {Bucket+1, []}; +rebucket([{Entry} | RestHistory], NewSeq, Bucket) -> + CurSeq = couch_util:get_value(<<"source_seq">>, Entry), + case find_bucket(NewSeq, CurSeq, Bucket) of + Bucket -> + % This entry is in an existing bucket which means + % we will only keep it if its the oldest value + % in the bucket. To detect this we rebucket the + % rest of the list and only include Entry if the + % rest of the list is in a bigger bucket. + case rebucket(RestHistory, NewSeq, Bucket) of + {Bucket, NewHistory} -> + % There's another entry in this bucket so we drop the + % current entry. + {Bucket, NewHistory}; + {NextBucket, NewHistory} when NextBucket > Bucket -> + % The rest of the history was rebucketed into a larger + % bucket so this is the oldest entry in the current + % bucket. + {Bucket, [{Entry} | NewHistory]} + end; + NextBucket when NextBucket > Bucket -> + % This entry is the newest in NextBucket so we add it + % to our history and continue rebucketing. + {_, NewHistory} = rebucket(RestHistory, NewSeq, NextBucket), + {NextBucket, [{Entry} | NewHistory]} + end. + + +%% @doc Find the bucket id for the given sequence pair. +find_bucket(NewSeq, CurSeq, Bucket) -> + % The +1 constant in this comparison is a bit subtle. The + % reason for it is to make sure that the first entry in + % the history is guaranteed to have a BucketId of 1. This + % also relies on never having a duplicated update + % sequence so adding 1 here guarantees a difference >= 2. + if (NewSeq - CurSeq + 1) > (2 bsl Bucket) -> + find_bucket(NewSeq, CurSeq, Bucket+1); + true -> + Bucket + end. + + +rexi_call(Node, MFA) -> + Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]), + Ref = rexi:cast(Node, self(), MFA, [sync]), + try + receive {Ref, {ok, Reply}} -> + Reply; + {Ref, Error} -> + erlang:error(Error); + {rexi_DOWN, Mon, _, Reason} -> + erlang:error({rexi_DOWN, {Node, Reason}}) + after 600000 -> + erlang:error(timeout) + end + after + rexi_monitor:stop(Mon) + end. + + +get_or_create_db(DbName, Options) -> + couch_db:open_int(DbName, [{create_if_missing, true} | Options]). + + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + + +-define(SNODE, <<"src@localhost">>). +-define(TNODE, <<"tgt@localhost">>). +-define(SNODE_KV, {<<"source_node">>, ?SNODE}). +-define(TNODE_KV, {<<"target_node">>, ?TNODE}). +-define(SSEQ, <<"source_seq">>). +-define(TSEQ, <<"target_seq">>). +-define(ENTRY(S, T), {[?SNODE_KV, {?SSEQ, S}, ?TNODE_KV, {?TSEQ, T}]}). + + +filter_history_data() -> + [ + ?ENTRY(13, 15), + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]. + + +filter_history_remove_none_test() -> + ?assertEqual(filter_history(20, 20, filter_history_data()), [ + ?ENTRY(13, 15), + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]). + + +filter_history_remove_all_test() -> + ?assertEqual(filter_history(1, 1, filter_history_data()), []). + + +filter_history_remove_equal_test() -> + ?assertEqual(filter_history(10, 10, filter_history_data()), [ + ?ENTRY(2, 3) + ]), + ?assertEqual(filter_history(11, 9, filter_history_data()), [ + ?ENTRY(2, 3) + ]). + + +filter_history_remove_for_source_and_target_test() -> + ?assertEqual(filter_history(11, 20, filter_history_data()), [ + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]), + ?assertEqual(filter_history(14, 14, filter_history_data()), [ + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]). + + +filter_history_remove_for_both_test() -> + ?assertEqual(filter_history(11, 11, filter_history_data()), [ + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]). + + +filter_history_remove_for_both_again_test() -> + ?assertEqual(filter_history(3, 4, filter_history_data()), [ + ?ENTRY(2, 3) + ]). + + +add_first_checkpoint_test() -> + History = {[]}, + ?assertEqual(add_checkpoint(?ENTRY(2, 3), History), {[ + {?SNODE, [ + ?ENTRY(2, 3) + ]} + ]}). + + +add_first_checkpoint_to_empty_test() -> + History = {[{?SNODE, []}]}, + ?assertEqual(add_checkpoint(?ENTRY(2, 3), History), {[ + {?SNODE, [ + ?ENTRY(2, 3) + ]} + ]}). + + +add_second_checkpoint_test() -> + History = {[{?SNODE, [?ENTRY(2, 3)]}]}, + ?assertEqual(add_checkpoint(?ENTRY(10, 9), History), {[ + {?SNODE, [ + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]} + ]}). + + +add_third_checkpoint_test() -> + History = {[{?SNODE, [ + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]}]}, + ?assertEqual(add_checkpoint(?ENTRY(11, 10), History), {[ + {?SNODE, [ + ?ENTRY(11, 10), + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]} + ]}). + + +add_fourth_checkpoint_test() -> + History = {[{?SNODE, [ + ?ENTRY(11, 10), + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]}]}, + ?assertEqual(add_checkpoint(?ENTRY(12, 13), History), {[ + {?SNODE, [ + ?ENTRY(12, 13), + ?ENTRY(11, 10), + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]} + ]}). + + +add_checkpoint_with_replacement_test() -> + History = {[{?SNODE, [ + ?ENTRY(12, 13), + ?ENTRY(11, 10), + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]}]}, + % Picking a source_seq of 16 to force 10, 11, and 12 + % into the same bucket to show we drop the 11 entry. + ?assertEqual(add_checkpoint(?ENTRY(16, 16), History), {[ + {?SNODE, [ + ?ENTRY(16, 16), + ?ENTRY(12, 13), + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]} + ]}). + +add_checkpoint_drops_redundant_checkpoints_test() -> + % I've added comments showing the bucket ID based + % on the ?ENTRY passed to add_checkpoint + History = {[{?SNODE, [ + ?ENTRY(15, 15), % Bucket 0 + ?ENTRY(14, 14), % Bucket 1 + ?ENTRY(13, 13), % Bucket 1 + ?ENTRY(12, 12), % Bucket 2 + ?ENTRY(11, 11), % Bucket 2 + ?ENTRY(10, 10), % Bucket 2 + ?ENTRY(9, 9), % Bucket 2 + ?ENTRY(8, 8), % Bucket 3 + ?ENTRY(7, 7), % Bucket 3 + ?ENTRY(6, 6), % Bucket 3 + ?ENTRY(5, 5), % Bucket 3 + ?ENTRY(4, 4), % Bucket 3 + ?ENTRY(3, 3), % Bucket 3 + ?ENTRY(2, 2), % Bucket 3 + ?ENTRY(1, 1) % Bucket 3 + ]}]}, + ?assertEqual(add_checkpoint(?ENTRY(16, 16), History), {[ + {?SNODE, [ + ?ENTRY(16, 16), % Bucket 0 + ?ENTRY(15, 15), % Bucket 0 + ?ENTRY(14, 14), % Bucket 1 + ?ENTRY(13, 13), % Bucket 1 + ?ENTRY(12, 12), % Bucket 2 + ?ENTRY(9, 9), % Bucket 2 + ?ENTRY(8, 8), % Bucket 3 + ?ENTRY(1, 1) % Bucket 3 + ]} + ]}). + + +add_checkpoint_show_not_always_a_drop_test() -> + % Depending on the edge conditions of buckets we + % may not always drop values when adding new + % checkpoints. In this case 12 stays because there's + % no longer a value for 10 or 11. + % + % I've added comments showing the bucket ID based + % on the ?ENTRY passed to add_checkpoint + History = {[{?SNODE, [ + ?ENTRY(16, 16), % Bucket 0 + ?ENTRY(15, 15), % Bucket 1 + ?ENTRY(14, 14), % Bucket 1 + ?ENTRY(13, 13), % Bucket 2 + ?ENTRY(12, 12), % Bucket 2 + ?ENTRY(9, 9), % Bucket 3 + ?ENTRY(8, 8), % Bucket 3 + ?ENTRY(1, 1) % Bucket 4 + ]}]}, + ?assertEqual(add_checkpoint(?ENTRY(17, 17), History), {[ + {?SNODE, [ + ?ENTRY(17, 17), % Bucket 0 + ?ENTRY(16, 16), % Bucket 0 + ?ENTRY(15, 15), % Bucket 1 + ?ENTRY(14, 14), % Bucket 1 + ?ENTRY(13, 13), % Bucket 2 + ?ENTRY(12, 12), % Bucket 2 + ?ENTRY(9, 9), % Bucket 3 + ?ENTRY(8, 8), % Bucket 3 + ?ENTRY(1, 1) % Bucket 4 + ]} + ]}). + + +add_checkpoint_big_jump_show_lots_drop_test() -> + % I've added comments showing the bucket ID based + % on the ?ENTRY passed to add_checkpoint + History = {[{?SNODE, [ + ?ENTRY(16, 16), % Bucket 4 + ?ENTRY(15, 15), % Bucket 4 + ?ENTRY(14, 14), % Bucket 4 + ?ENTRY(13, 13), % Bucket 4 + ?ENTRY(12, 12), % Bucket 4 + ?ENTRY(9, 9), % Bucket 4 + ?ENTRY(8, 8), % Bucket 4 + ?ENTRY(1, 1) % Bucket 4 + ]}]}, + ?assertEqual(add_checkpoint(?ENTRY(32, 32), History), {[ + {?SNODE, [ + ?ENTRY(32, 32), % Bucket 0 + ?ENTRY(16, 16), % Bucket 4 + ?ENTRY(1, 1) % Bucket 4 + ]} + ]}). + + +add_checkpoint_show_filter_history_test() -> + History = {[{?SNODE, [ + ?ENTRY(16, 16), + ?ENTRY(15, 15), + ?ENTRY(14, 14), + ?ENTRY(13, 13), + ?ENTRY(12, 12), + ?ENTRY(9, 9), + ?ENTRY(8, 8), + ?ENTRY(1, 1) + ]}]}, + % Drop for both + ?assertEqual(add_checkpoint(?ENTRY(10, 10), History), {[ + {?SNODE, [ + ?ENTRY(10, 10), + ?ENTRY(9, 9), + ?ENTRY(8, 8), + ?ENTRY(1, 1) + ]} + ]}), + % Drop four source + ?assertEqual(add_checkpoint(?ENTRY(10, 200), History), {[ + {?SNODE, [ + ?ENTRY(10, 200), + ?ENTRY(9, 9), + ?ENTRY(8, 8), + ?ENTRY(1, 1) + ]} + ]}), + % Drop for target. Obviously a source_seq of 200 + % will end up droping the 8 entry. + ?assertEqual(add_checkpoint(?ENTRY(200, 10), History), {[ + {?SNODE, [ + ?ENTRY(200, 10), + ?ENTRY(9, 9), + ?ENTRY(1, 1) + ]} + ]}). + + +add_checkpoint_from_other_node_test() -> + History = {[{<<"not_the_source">>, [ + ?ENTRY(12, 13), + ?ENTRY(11, 10), + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]}]}, + % No filtering + ?assertEqual(add_checkpoint(?ENTRY(1, 1), History), {[ + {?SNODE, [ + ?ENTRY(1, 1) + ]}, + {<<"not_the_source">>, [ + ?ENTRY(12, 13), + ?ENTRY(11, 10), + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]} + ]}), + % No dropping + ?assertEqual(add_checkpoint(?ENTRY(200, 200), History), {[ + {?SNODE, [ + ?ENTRY(200, 200) + ]}, + {<<"not_the_source">>, [ + ?ENTRY(12, 13), + ?ENTRY(11, 10), + ?ENTRY(10, 9), + ?ENTRY(2, 3) + ]} + ]}). + + +-endif. diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl new file mode 100644 index 000000000..c7f33c61f --- /dev/null +++ b/src/mem3/src/mem3_shards.erl @@ -0,0 +1,419 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_shards). +-behaviour(gen_server). +-vsn(3). +-behaviour(config_listener). + +-export([init/1, terminate/2, code_change/3]). +-export([handle_call/3, handle_cast/2, handle_info/2]). +-export([handle_config_change/5, handle_config_terminate/3]). + +-export([start_link/0]). +-export([for_db/1, for_db/2, for_docid/2, for_docid/3, get/3, local/1, fold/2]). +-export([for_shard_name/1]). +-export([set_max_size/1]). + +-record(st, { + max_size = 25000, + cur_size = 0, + changes_pid +}). + +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-define(DBS, mem3_dbs). +-define(SHARDS, mem3_shards). +-define(ATIMES, mem3_atimes). +-define(RELISTEN_DELAY, 5000). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +for_db(DbName) -> + for_db(DbName, []). + +for_db(DbName, Options) -> + Shards = try ets:lookup(?SHARDS, DbName) of + [] -> + load_shards_from_disk(DbName); + Else -> + gen_server:cast(?MODULE, {cache_hit, DbName}), + Else + catch error:badarg -> + load_shards_from_disk(DbName) + end, + case lists:member(ordered, Options) of + true -> Shards; + false -> mem3_util:downcast(Shards) + end. + +for_docid(DbName, DocId) -> + for_docid(DbName, DocId, []). + +for_docid(DbName, DocId, Options) -> + HashKey = mem3_util:hash(DocId), + ShardHead = #shard{ + name = '_', + node = '_', + dbname = DbName, + range = ['$1','$2'], + ref = '_' + }, + OrderedShardHead = #ordered_shard{ + name = '_', + node = '_', + dbname = DbName, + range = ['$1','$2'], + ref = '_', + order = '_' + }, + Conditions = [{'=<', '$1', HashKey}, {'=<', HashKey, '$2'}], + ShardSpec = {ShardHead, Conditions, ['$_']}, + OrderedShardSpec = {OrderedShardHead, Conditions, ['$_']}, + Shards = try ets:select(?SHARDS, [ShardSpec, OrderedShardSpec]) of + [] -> + load_shards_from_disk(DbName, DocId); + Else -> + gen_server:cast(?MODULE, {cache_hit, DbName}), + Else + catch error:badarg -> + load_shards_from_disk(DbName, DocId) + end, + case lists:member(ordered, Options) of + true -> Shards; + false -> mem3_util:downcast(Shards) + end. + +for_shard_name(ShardName) -> + for_shard_name(ShardName, []). + +for_shard_name(ShardName, Options) -> + DbName = mem3:dbname(ShardName), + ShardHead = #shard{ + name = ShardName, + node = '_', + dbname = DbName, + range = '_', + ref = '_' + }, + OrderedShardHead = #ordered_shard{ + name = ShardName, + node = '_', + dbname = DbName, + range = '_', + ref = '_', + order = '_' + }, + ShardSpec = {ShardHead, [], ['$_']}, + OrderedShardSpec = {OrderedShardHead, [], ['$_']}, + Shards = try ets:select(?SHARDS, [ShardSpec, OrderedShardSpec]) of + [] -> + filter_shards_by_name(ShardName, load_shards_from_disk(DbName)); + Else -> + gen_server:cast(?MODULE, {cache_hit, DbName}), + Else + catch error:badarg -> + filter_shards_by_name(ShardName, load_shards_from_disk(DbName)) + end, + case lists:member(ordered, Options) of + true -> Shards; + false -> mem3_util:downcast(Shards) + end. + +get(DbName, Node, Range) -> + Res = lists:foldl(fun(#shard{node=N, range=R}=S, Acc) -> + case {N, R} of + {Node, Range} -> [S | Acc]; + _ -> Acc + end + end, [], for_db(DbName)), + case Res of + [] -> {error, not_found}; + [Shard] -> {ok, Shard}; + [_|_] -> {error, duplicates} + end. + +local(DbName) when is_list(DbName) -> + local(list_to_binary(DbName)); +local(DbName) -> + Pred = fun(#shard{node=Node}) when Node == node() -> true; (_) -> false end, + lists:filter(Pred, for_db(DbName)). + +fold(Fun, Acc) -> + DbName = config:get("mem3", "shards_db", "_dbs"), + {ok, Db} = mem3_util:ensure_exists(DbName), + FAcc = {Db, Fun, Acc}, + try + {ok, _, LastAcc} = couch_db:enum_docs(Db, fun fold_fun/3, FAcc, []), + {_Db, _UFun, UAcc} = LastAcc, + UAcc + after + couch_db:close(Db) + end. + +set_max_size(Size) when is_integer(Size), Size > 0 -> + gen_server:call(?MODULE, {set_max_size, Size}). + +handle_config_change("mem3", "shard_cache_size", SizeList, _, _) -> + Size = list_to_integer(SizeList), + {ok, gen_server:call(?MODULE, {set_max_size, Size}, infinity)}; +handle_config_change("mem3", "shards_db", _DbName, _, _) -> + {ok, gen_server:call(?MODULE, shard_db_changed, infinity)}; +handle_config_change(_, _, _, _, _) -> + {ok, nil}. + +handle_config_terminate(_, stop, _) -> + ok; +handle_config_terminate(_Server, _Reason, _State) -> + erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener). + +init([]) -> + ets:new(?SHARDS, [ + bag, + protected, + named_table, + {keypos,#shard.dbname}, + {read_concurrency, true} + ]), + ets:new(?DBS, [set, protected, named_table]), + ets:new(?ATIMES, [ordered_set, protected, named_table]), + ok = config:listen_for_changes(?MODULE, nil), + SizeList = config:get("mem3", "shard_cache_size", "25000"), + {Pid, _} = spawn_monitor(fun() -> listen_for_changes(get_update_seq()) end), + {ok, #st{ + max_size = list_to_integer(SizeList), + cur_size = 0, + changes_pid = Pid + }}. + +handle_call({set_max_size, Size}, _From, St) -> + {reply, ok, cache_free(St#st{max_size=Size})}; +handle_call(shard_db_changed, _From, St) -> + exit(St#st.changes_pid, shard_db_changed), + {reply, ok, St}; +handle_call(_Call, _From, St) -> + {noreply, St}. + +handle_cast({cache_hit, DbName}, St) -> + couch_stats:increment_counter([mem3, shard_cache, hit]), + cache_hit(DbName), + {noreply, St}; +handle_cast({cache_insert, DbName, Shards}, St) -> + couch_stats:increment_counter([mem3, shard_cache, miss]), + {noreply, cache_free(cache_insert(St, DbName, Shards))}; +handle_cast({cache_remove, DbName}, St) -> + couch_stats:increment_counter([mem3, shard_cache, eviction]), + {noreply, cache_remove(St, DbName)}; +handle_cast(_Msg, St) -> + {noreply, St}. + +handle_info({'DOWN', _, _, Pid, Reason}, #st{changes_pid=Pid}=St) -> + {NewSt, Seq} = case Reason of + {seq, EndSeq} -> + {St, EndSeq}; + shard_db_changed -> + {cache_clear(St), get_update_seq()}; + _ -> + couch_log:notice("~p changes listener died ~p", [?MODULE, Reason]), + {St, get_update_seq()} + end, + erlang:send_after(5000, self(), {start_listener, Seq}), + {noreply, NewSt#st{changes_pid=undefined}}; +handle_info({start_listener, Seq}, St) -> + {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end), + {noreply, St#st{changes_pid=NewPid}}; +handle_info(restart_config_listener, State) -> + ok = config:listen_for_changes(?MODULE, nil), + {noreply, State}; +handle_info(_Msg, St) -> + {noreply, St}. + +terminate(_Reason, #st{changes_pid=Pid}) -> + exit(Pid, kill), + ok. + +code_change(_OldVsn, #st{}=St, _Extra) -> + {ok, St}. + +%% internal functions + +fold_fun(#full_doc_info{}=FDI, _, Acc) -> + DI = couch_doc:to_doc_info(FDI), + fold_fun(DI, nil, Acc); +fold_fun(#doc_info{}=DI, _, {Db, UFun, UAcc}) -> + case couch_db:open_doc(Db, DI, [ejson_body, conflicts]) of + {ok, Doc} -> + {Props} = Doc#doc.body, + Shards = mem3_util:build_shards(Doc#doc.id, Props), + NewUAcc = lists:foldl(UFun, UAcc, Shards), + {ok, {Db, UFun, NewUAcc}}; + _ -> + {ok, {Db, UFun, UAcc}} + end. + +get_update_seq() -> + DbName = config:get("mem3", "shards_db", "_dbs"), + {ok, Db} = mem3_util:ensure_exists(DbName), + couch_db:close(Db), + Db#db.update_seq. + +listen_for_changes(Since) -> + DbName = config:get("mem3", "shards_db", "_dbs"), + {ok, Db} = mem3_util:ensure_exists(DbName), + Args = #changes_args{ + feed = "continuous", + since = Since, + heartbeat = true, + include_docs = true + }, + ChangesFun = couch_changes:handle_db_changes(Args, Since, Db), + ChangesFun(fun changes_callback/2). + +changes_callback(start, Acc) -> + {ok, Acc}; +changes_callback({stop, EndSeq}, _) -> + exit({seq, EndSeq}); +changes_callback({change, {Change}, _}, _) -> + DbName = couch_util:get_value(<<"id">>, Change), + case DbName of <<"_design/", _/binary>> -> ok; _Else -> + case mem3_util:is_deleted(Change) of + true -> + gen_server:cast(?MODULE, {cache_remove, DbName}); + false -> + case couch_util:get_value(doc, Change) of + {error, Reason} -> + couch_log:error("missing partition table for ~s: ~p", + [DbName, Reason]); + {Doc} -> + Shards = mem3_util:build_ordered_shards(DbName, Doc), + gen_server:cast(?MODULE, {cache_insert, DbName, Shards}), + [create_if_missing(mem3:name(S)) || S + <- Shards, mem3:node(S) =:= node()] + end + end + end, + {ok, couch_util:get_value(<<"seq">>, Change)}; +changes_callback(timeout, _) -> + ok. + +load_shards_from_disk(DbName) when is_binary(DbName) -> + X = ?l2b(config:get("mem3", "shards_db", "_dbs")), + {ok, Db} = mem3_util:ensure_exists(X), + try + load_shards_from_db(Db, DbName) + after + couch_db:close(Db) + end. + +load_shards_from_db(#db{} = ShardDb, DbName) -> + case couch_db:open_doc(ShardDb, DbName, [ejson_body]) of + {ok, #doc{body = {Props}}} -> + Shards = mem3_util:build_ordered_shards(DbName, Props), + gen_server:cast(?MODULE, {cache_insert, DbName, Shards}), + Shards; + {not_found, _} -> + erlang:error(database_does_not_exist, ?b2l(DbName)) + end. + +load_shards_from_disk(DbName, DocId)-> + Shards = load_shards_from_disk(DbName), + HashKey = mem3_util:hash(DocId), + [S || S <- Shards, in_range(S, HashKey)]. + +in_range(Shard, HashKey) -> + [B, E] = mem3:range(Shard), + B =< HashKey andalso HashKey =< E. + +create_if_missing(Name) -> + DbDir = config:get("couchdb", "database_dir"), + Filename = filename:join(DbDir, ?b2l(Name) ++ ".couch"), + case filelib:is_regular(Filename) of + true -> + ok; + false -> + case couch_server:create(Name, [?ADMIN_CTX]) of + {ok, Db} -> + couch_db:close(Db); + Error -> + couch_log:error("~p tried to create ~s, got ~p", + [?MODULE, Name, Error]) + end + end. + +cache_insert(#st{cur_size=Cur}=St, DbName, Shards) -> + NewATime = now(), + true = ets:delete(?SHARDS, DbName), + true = ets:insert(?SHARDS, Shards), + case ets:lookup(?DBS, DbName) of + [{DbName, ATime}] -> + true = ets:delete(?ATIMES, ATime), + true = ets:insert(?ATIMES, {NewATime, DbName}), + true = ets:insert(?DBS, {DbName, NewATime}), + St; + [] -> + true = ets:insert(?ATIMES, {NewATime, DbName}), + true = ets:insert(?DBS, {DbName, NewATime}), + St#st{cur_size=Cur + 1} + end. + +cache_remove(#st{cur_size=Cur}=St, DbName) -> + true = ets:delete(?SHARDS, DbName), + case ets:lookup(?DBS, DbName) of + [{DbName, ATime}] -> + true = ets:delete(?DBS, DbName), + true = ets:delete(?ATIMES, ATime), + St#st{cur_size=Cur-1}; + [] -> + St + end. + +cache_hit(DbName) -> + case ets:lookup(?DBS, DbName) of + [{DbName, ATime}] -> + NewATime = now(), + true = ets:delete(?ATIMES, ATime), + true = ets:insert(?ATIMES, {NewATime, DbName}), + true = ets:insert(?DBS, {DbName, NewATime}); + [] -> + ok + end. + +cache_free(#st{max_size=Max, cur_size=Cur}=St) when Max =< Cur -> + ATime = ets:first(?ATIMES), + [{ATime, DbName}] = ets:lookup(?ATIMES, ATime), + true = ets:delete(?ATIMES, ATime), + true = ets:delete(?DBS, DbName), + true = ets:delete(?SHARDS, DbName), + cache_free(St#st{cur_size=Cur-1}); +cache_free(St) -> + St. + +cache_clear(St) -> + true = ets:delete_all_objects(?DBS), + true = ets:delete_all_objects(?SHARDS), + true = ets:delete_all_objects(?ATIMES), + St#st{cur_size=0}. + +filter_shards_by_name(Name, Shards) -> + filter_shards_by_name(Name, [], Shards). + +filter_shards_by_name(_, Matches, []) -> + Matches; +filter_shards_by_name(Name, Matches, [#ordered_shard{name=Name}=S|Ss]) -> + filter_shards_by_name(Name, [S|Matches], Ss); +filter_shards_by_name(Name, Matches, [#shard{name=Name}=S|Ss]) -> + filter_shards_by_name(Name, [S|Matches], Ss); +filter_shards_by_name(Name, Matches, [_|Ss]) -> + filter_shards_by_name(Name, Matches, Ss). diff --git a/src/mem3/src/mem3_sup.erl b/src/mem3/src/mem3_sup.erl new file mode 100644 index 000000000..80b8ca37f --- /dev/null +++ b/src/mem3/src/mem3_sup.erl @@ -0,0 +1,35 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_sup). +-behaviour(supervisor). +-export([start_link/0, init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init(_Args) -> + Children = [ + child(mem3_events), + child(mem3_nodes), + child(mem3_sync_nodes), % Order important? + child(mem3_sync), + child(mem3_shards), + child(mem3_sync_event_listener) + ], + {ok, {{one_for_one,10,1}, couch_epi:register_service(mem3_epi, Children)}}. + +child(mem3_events) -> + MFA = {gen_event, start_link, [{local, mem3_events}]}, + {mem3_events, MFA, permanent, 1000, worker, dynamic}; +child(Child) -> + {Child, {Child, start_link, []}, permanent, 1000, worker, [Child]}. diff --git a/src/mem3/src/mem3_sync.erl b/src/mem3/src/mem3_sync.erl new file mode 100644 index 000000000..640181509 --- /dev/null +++ b/src/mem3/src/mem3_sync.erl @@ -0,0 +1,319 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_sync). +-behaviour(gen_server). +-vsn(1). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([start_link/0, get_active/0, get_queue/0, push/1, push/2, + remove_node/1, remove_shard/1, initial_sync/1, get_backlog/0, nodes_db/0, + shards_db/0, users_db/0, find_next_node/0]). + +-import(queue, [in/2, out/1, to_list/1, join/2, from_list/1, is_empty/1]). + +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-record(state, { + active = [], + count = 0, + limit, + dict = dict:new(), + waiting = queue:new() +}). + +-record(job, {name, node, count=nil, pid=nil}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +get_active() -> + gen_server:call(?MODULE, get_active). + +get_queue() -> + gen_server:call(?MODULE, get_queue). + +get_backlog() -> + gen_server:call(?MODULE, get_backlog). + +push(#shard{name = Name}, Target) -> + push(Name, Target); +push(Name, #shard{node=Node}) -> + push(Name, Node); +push(Name, Node) -> + push(#job{name = Name, node = Node}). + +push(#job{node = Node} = Job) when Node =/= node() -> + gen_server:cast(?MODULE, {push, Job}); +push(_) -> + ok. + +remove_node(Node) -> + gen_server:cast(?MODULE, {remove_node, Node}). + +remove_shard(Shard) -> + gen_server:cast(?MODULE, {remove_shard, Shard}). + +init([]) -> + process_flag(trap_exit, true), + Concurrency = config:get("mem3", "sync_concurrency", "10"), + gen_event:add_handler(mem3_events, mem3_sync_event, []), + initial_sync(), + {ok, #state{limit = list_to_integer(Concurrency)}}. + +handle_call({push, Job}, From, State) -> + handle_cast({push, Job#job{pid = From}}, State); + +handle_call(get_active, _From, State) -> + {reply, State#state.active, State}; + +handle_call(get_queue, _From, State) -> + {reply, to_list(State#state.waiting), State}; + +handle_call(get_backlog, _From, #state{active=A, waiting=WQ} = State) -> + CA = lists:sum([C || #job{count=C} <- A, is_integer(C)]), + CW = lists:sum([C || #job{count=C} <- to_list(WQ), is_integer(C)]), + {reply, CA+CW, State}. + +handle_cast({push, DbName, Node}, State) -> + handle_cast({push, #job{name = DbName, node = Node}}, State); + +handle_cast({push, Job}, #state{count=Count, limit=Limit} = State) + when Count >= Limit -> + {noreply, add_to_queue(State, Job)}; + +handle_cast({push, Job}, State) -> + #state{active = L, count = C} = State, + #job{name = DbName, node = Node} = Job, + case is_running(DbName, Node, L) of + true -> + {noreply, add_to_queue(State, Job)}; + false -> + Pid = start_push_replication(Job), + {noreply, State#state{active=[Job#job{pid=Pid}|L], count=C+1}} + end; + +handle_cast({remove_node, Node}, #state{waiting = W0} = State) -> + {Alive, Dead} = lists:partition(fun(#job{node=N}) -> N =/= Node end, to_list(W0)), + Dict = remove_entries(State#state.dict, Dead), + [exit(Pid, die_now) || #job{node=N, pid=Pid} <- State#state.active, + N =:= Node], + {noreply, State#state{dict = Dict, waiting = from_list(Alive)}}; + +handle_cast({remove_shard, Shard}, #state{waiting = W0} = State) -> + {Alive, Dead} = lists:partition(fun(#job{name=S}) -> + S =/= Shard end, to_list(W0)), + Dict = remove_entries(State#state.dict, Dead), + [exit(Pid, die_now) || #job{name=S, pid=Pid} <- State#state.active, + S =:= Shard], + {noreply, State#state{dict = Dict, waiting = from_list(Alive)}}. + +handle_info({'EXIT', Active, normal}, State) -> + handle_replication_exit(State, Active); + +handle_info({'EXIT', Active, die_now}, State) -> + % we forced this one ourselves, do not retry + handle_replication_exit(State, Active); + +handle_info({'EXIT', Active, {{not_found, no_db_file}, _Stack}}, State) -> + % target doesn't exist, do not retry + handle_replication_exit(State, Active); + +handle_info({'EXIT', Active, Reason}, State) -> + NewState = case lists:keyfind(Active, #job.pid, State#state.active) of + #job{name=OldDbName, node=OldNode} = Job -> + couch_log:warning("~s ~s ~s ~w", [?MODULE, OldDbName, OldNode, Reason]), + case Reason of {pending_changes, Count} -> + maybe_resubmit(State, Job#job{pid = nil, count = Count}); + _ -> + try mem3:shards(mem3:dbname(Job#job.name)) of _ -> + timer:apply_after(5000, ?MODULE, push, [Job#job{pid=nil}]) + catch error:database_does_not_exist -> + % no need to retry + ok + end, + State + end; + false -> State end, + handle_replication_exit(NewState, Active); + +handle_info(Msg, State) -> + couch_log:notice("unexpected msg at replication manager ~p", [Msg]), + {noreply, State}. + +terminate(_Reason, State) -> + [exit(Pid, shutdown) || #job{pid=Pid} <- State#state.active], + ok. + +code_change(_, #state{waiting = WaitingList} = State, _) when is_list(WaitingList) -> + {ok, State#state{waiting = from_list(WaitingList)}}; + +code_change(_, State, _) -> + {ok, State}. + +maybe_resubmit(State, #job{name=DbName, node=Node} = Job) -> + case lists:member(DbName, local_dbs()) of + true -> + case find_next_node() of + Node -> + add_to_queue(State, Job); + _ -> + State % don't resubmit b/c we have a new replication target + end; + false -> + add_to_queue(State, Job) + end. + +handle_replication_exit(State, Pid) -> + #state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State, + Active1 = lists:keydelete(Pid, #job.pid, Active), + case is_empty(Waiting) of + true -> + {noreply, State#state{active=Active1, count=length(Active1)}}; + _ -> + Count = length(Active1), + NewState = if Count < Limit -> + case next_replication(Active1, Waiting, queue:new()) of + nil -> % all waiting replications are also active + State#state{active = Active1, count = Count}; + {#job{name=DbName, node=Node} = Job, StillWaiting} -> + NewPid = start_push_replication(Job), + State#state{ + active = [Job#job{pid = NewPid} | Active1], + count = Count+1, + dict = dict:erase({DbName,Node}, D), + waiting = StillWaiting + } + end; + true -> + State#state{active = Active1, count=Count} + end, + {noreply, NewState} + end. + +start_push_replication(#job{name=Name, node=Node, pid=From}) -> + if From =/= nil -> gen_server:reply(From, ok); true -> ok end, + spawn_link(fun() -> + case mem3_rep:go(Name, maybe_redirect(Node)) of + {ok, Pending} when Pending > 0 -> + exit({pending_changes, Pending}); + _ -> + ok + end + end). + +add_to_queue(State, #job{name=DbName, node=Node, pid=From} = Job) -> + #state{dict=D, waiting=WQ} = State, + case dict:is_key({DbName, Node}, D) of + true -> + if From =/= nil -> gen_server:reply(From, ok); true -> ok end, + State; + false -> + couch_log:debug("adding ~s -> ~p to mem3_sync queue", [DbName, Node]), + State#state{ + dict = dict:store({DbName,Node}, ok, D), + waiting = in(Job, WQ) + } + end. + +sync_nodes_and_dbs() -> + Node = find_next_node(), + [push(Db, Node) || Db <- local_dbs()]. + +initial_sync() -> + [net_kernel:connect_node(Node) || Node <- mem3:nodes()], + mem3_sync_nodes:add(nodes()). + +initial_sync(Live) -> + sync_nodes_and_dbs(), + Acc = {node(), Live, []}, + {_, _, Shards} = mem3_shards:fold(fun initial_sync_fold/2, Acc), + submit_replication_tasks(node(), Live, Shards). + +initial_sync_fold(#shard{dbname = Db} = Shard, {LocalNode, Live, AccShards}) -> + case AccShards of + [#shard{dbname = AccDb} | _] when Db =/= AccDb -> + submit_replication_tasks(LocalNode, Live, AccShards), + {LocalNode, Live, [Shard]}; + _ -> + {LocalNode, Live, [Shard|AccShards]} + end. + +submit_replication_tasks(LocalNode, Live, Shards) -> + SplitFun = fun(#shard{node = Node}) -> Node =:= LocalNode end, + {Local, Remote} = lists:partition(SplitFun, Shards), + lists:foreach(fun(#shard{name = ShardName}) -> + [sync_push(ShardName, N) || #shard{node=N, name=Name} <- Remote, + Name =:= ShardName, lists:member(N, Live)] + end, Local). + +sync_push(ShardName, N) -> + gen_server:call(mem3_sync, {push, #job{name=ShardName, node=N}}, infinity). + + + +find_next_node() -> + LiveNodes = [node()|nodes()], + AllNodes0 = lists:sort(mem3:nodes()), + AllNodes1 = [X || X <- AllNodes0, lists:member(X, LiveNodes)], + AllNodes = AllNodes1 ++ [hd(AllNodes1)], + [_Self, Next| _] = lists:dropwhile(fun(N) -> N =/= node() end, AllNodes), + Next. + +%% @doc Finds the next {DbName,Node} pair in the list of waiting replications +%% which does not correspond to an already running replication +-spec next_replication([#job{}], queue:queue(_), queue:queue(_)) -> + {#job{}, queue:queue(_)} | nil. +next_replication(Active, Waiting, WaitingAndRunning) -> + case is_empty(Waiting) of + true -> + nil; + false -> + {{value, #job{name=S, node=N} = Job}, RemQ} = out(Waiting), + case is_running(S,N,Active) of + true -> + next_replication(Active, RemQ, in(Job, WaitingAndRunning)); + false -> + {Job, join(RemQ, WaitingAndRunning)} + end + end. + +is_running(DbName, Node, ActiveList) -> + [] =/= [true || #job{name=S, node=N} <- ActiveList, S=:=DbName, N=:=Node]. + +remove_entries(Dict, Entries) -> + lists:foldl(fun(#job{name=S, node=N}, D) -> + dict:erase({S, N}, D) + end, Dict, Entries). + +local_dbs() -> + [nodes_db(), shards_db(), users_db()]. + +nodes_db() -> + ?l2b(config:get("mem3", "nodes_db", "_nodes")). + +shards_db() -> + ?l2b(config:get("mem3", "shards_db", "_dbs")). + +users_db() -> + ?l2b(config:get("couch_httpd_auth", "authentication_db", "_users")). + +maybe_redirect(Node) -> + case config:get("mem3.redirects", atom_to_list(Node)) of + undefined -> + Node; + Redirect -> + couch_log:debug("Redirecting push from ~p to ~p", [Node, Redirect]), + list_to_existing_atom(Redirect) + end. diff --git a/src/mem3/src/mem3_sync_event.erl b/src/mem3/src/mem3_sync_event.erl new file mode 100644 index 000000000..7bca23086 --- /dev/null +++ b/src/mem3/src/mem3_sync_event.erl @@ -0,0 +1,86 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_sync_event). +-behaviour(gen_event). +-vsn(1). + +-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, + code_change/3]). + +init(_) -> + net_kernel:monitor_nodes(true), + {ok, nil}. + +handle_event({add_node, Node}, State) when Node =/= node() -> + net_kernel:connect_node(Node), + mem3_sync_nodes:add([Node]), + {ok, State}; + +handle_event({remove_node, Node}, State) -> + mem3_sync:remove_node(Node), + {ok, State}; + +handle_event(_Event, State) -> + {ok, State}. + +handle_call(_Request, State) -> + {ok, ok, State}. + +handle_info({nodeup, Node}, State) -> + Nodes0 = lists:usort([node() | drain_nodeups([Node])]), + Nodes = lists:filter(fun(N) -> lists:member(N, mem3:nodes()) end, Nodes0), + wait_for_rexi(Nodes, 5), + {ok, State}; + +handle_info({nodedown, Node}, State) -> + mem3_sync:remove_node(Node), + {ok, State}; + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +drain_nodeups(Acc) -> + receive + {nodeup, Node} -> + drain_nodeups([Node | Acc]) + after 0 -> + Acc + end. + +wait_for_rexi([], _Retries) -> + ok; +wait_for_rexi(Waiting, Retries) -> + % Hack around rpc:multicall/4 so that we can + % be sure which nodes gave which response + Msg = {call, rexi_server_mon, status, [], group_leader()}, + {Resp, _Bad} = gen_server:multi_call(Waiting, rex, Msg, 1000), + Up = [N || {N, R} <- Resp, R == ok], + NotUp = Waiting -- Up, + case length(Up) > 0 of + true -> + mem3_sync_nodes:add(Up); + false -> ok + end, + case length(NotUp) > 0 andalso Retries > 0 of + true -> + timer:sleep(1000), + wait_for_rexi(NotUp, Retries-1); + false -> + ok + end. diff --git a/src/mem3/src/mem3_sync_event_listener.erl b/src/mem3/src/mem3_sync_event_listener.erl new file mode 100644 index 000000000..7859c3109 --- /dev/null +++ b/src/mem3/src/mem3_sync_event_listener.erl @@ -0,0 +1,309 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_sync_event_listener). +-behavior(couch_event_listener). +-vsn(1). + +-export([ + start_link/0 +]). + +-export([ + init/1, + terminate/2, + handle_event/3, + handle_cast/2, + handle_info/2 +]). + +-include_lib("mem3/include/mem3.hrl"). + +-ifdef(TEST). +-define(RELISTEN_DELAY, 500). +-else. +-define(RELISTEN_DELAY, 5000). +-endif. + +-record(state, { + nodes, + shards, + users, + delay, + frequency, + last_push, + buckets +}). + +%% Calling mem3_sync:push/2 on every update has a measurable performance cost, +%% so we'd like to coalesce multiple update messages from couch_event in to a +%% single push call. Doing this while ensuring both correctness (i.e., no lost +%% updates) and an even load profile is somewhat subtle. This implementation +%% groups updated shards in a list of "buckets" (see bucket_shard/2) and +%% guarantees that each shard is in no more than one bucket at a time - i.e., +%% any update messages received before the shard's current bucket has been +%% pushed will be ignored - thereby reducing the frequency with which a single +%% shard will be pushed. mem3_sync:push/2 is called on all shards in the +%% *oldest* bucket roughly every mem3.sync_frequency milliseconds (see +%% maybe_push_shards/1) to even out the load on mem3_sync. + +start_link() -> + couch_event_listener:start_link(?MODULE, [], [all_dbs]). + +init(_) -> + ok = subscribe_for_config(), + Delay = config:get_integer("mem3", "sync_delay", 5000), + Frequency = config:get_integer("mem3", "sync_frequency", 500), + Buckets = lists:duplicate(Delay div Frequency + 1, sets:new()), + St = #state{ + nodes = mem3_sync:nodes_db(), + shards = mem3_sync:shards_db(), + users = mem3_sync:users_db(), + delay = Delay, + frequency = Frequency, + buckets = Buckets + }, + {ok, St}. + +terminate(_Reason, _State) -> + ok. + +handle_event(NodesDb, updated, #state{nodes = NodesDb} = St) -> + Nodes = mem3:nodes(), + Live = nodes(), + [mem3_sync:push(NodesDb, N) || N <- Nodes, lists:member(N, Live)], + maybe_push_shards(St); +handle_event(ShardsDb, updated, #state{shards = ShardsDb} = St) -> + mem3_sync:push(ShardsDb, mem3_sync:find_next_node()), + maybe_push_shards(St); +handle_event(UsersDb, updated, #state{users = UsersDb} = St) -> + mem3_sync:push(UsersDb, mem3_sync:find_next_node()), + maybe_push_shards(St); +handle_event(<<"shards/", _/binary>> = ShardName, updated, St) -> + Buckets = bucket_shard(ShardName, St#state.buckets), + maybe_push_shards(St#state{buckets=Buckets}); +handle_event(<<"shards/", _:18/binary, _/binary>> = ShardName, deleted, St) -> + mem3_sync:remove_shard(ShardName), + maybe_push_shards(St); +handle_event(_DbName, _Event, St) -> + maybe_push_shards(St). + +handle_cast({set_frequency, Frequency}, St) -> + #state{delay = Delay, buckets = Buckets0} = St, + Buckets1 = rebucket_shards(Delay, Frequency, Buckets0), + maybe_push_shards(St#state{frequency=Frequency, buckets=Buckets1}); +handle_cast({set_delay, Delay}, St) -> + #state{frequency = Frequency, buckets = Buckets0} = St, + Buckets1 = rebucket_shards(Delay, Frequency, Buckets0), + maybe_push_shards(St#state{delay=Delay, buckets=Buckets1}); +handle_cast(Msg, St) -> + couch_log:notice("unexpected cast to mem3_sync_event_listener: ~p", [Msg]), + maybe_push_shards(St). + +handle_info(timeout, St) -> + maybe_push_shards(St); +handle_info({config_change, "mem3", "sync_delay", Value, _}, St) -> + set_config(set_delay, Value, "ignoring bad value for mem3.sync_delay"), + maybe_push_shards(St); +handle_info({config_change, "mem3", "sync_frequency", Value, _}, St) -> + set_config(set_frequency, Value, "ignoring bad value for mem3.sync_frequency"), + maybe_push_shards(St); +handle_info({gen_event_EXIT, _Handler, _Reason}, St) -> + erlang:send_after(?RELISTEN_DELAY, self(), restart_config_listener), + maybe_push_shards(St); +handle_info(restart_config_listener, St) -> + ok = subscribe_for_config(), + maybe_push_shards(St); +handle_info({get_state, Ref, Caller}, St) -> + Caller ! {Ref, St}, + {ok, St}; +handle_info(Msg, St) -> + couch_log:notice("unexpected info to mem3_sync_event_listener: ~p", [Msg]), + maybe_push_shards(St). + +set_config(Cmd, Value, Error) -> + try list_to_integer(Value) of + IntegerValue -> + couch_event_listener:cast(self(), {Cmd, IntegerValue}) + catch error:badarg -> + couch_log:warning("~s: ~p", [Error, Value]) + end. + +bucket_shard(ShardName, [B|Bs]=Buckets0) -> + case waiting(ShardName, Buckets0) of + true -> Buckets0; + false -> [sets:add_element(ShardName, B)|Bs] + end. + +waiting(_, []) -> + false; +waiting(ShardName, [B|Bs]) -> + case sets:is_element(ShardName, B) of + true -> true; + false -> waiting(ShardName, Bs) + end. + +rebucket_shards(Frequency, Delay, Buckets0) -> + case (Delay div Frequency + 1) - length(Buckets0) of + 0 -> + Buckets0; + N when N < 0 -> + %% Reduce the number of buckets by merging the last N + 1 together + {ToMerge, [B|Buckets1]} = lists:split(abs(N), Buckets0), + [sets:union([B|ToMerge])|Buckets1]; + M -> + %% Extend the number of buckets by M + lists:duplicate(M, sets:new()) ++ Buckets0 + end. + +%% To ensure that mem3_sync:push/2 is indeed called with roughly the frequency +%% specified by #state.frequency, every message callback must return via a call +%% to maybe_push_shards/1 rather than directly. All timing coordination - i.e., +%% calling mem3_sync:push/2 or setting a proper timeout to ensure that pending +%% messages aren't dropped in case no further messages arrive - is handled here. +maybe_push_shards(#state{last_push=undefined} = St) -> + {ok, St#state{last_push=os:timestamp()}, St#state.frequency}; +maybe_push_shards(St) -> + #state{frequency=Frequency, last_push=LastPush, buckets=Buckets0} = St, + Now = os:timestamp(), + Delta = timer:now_diff(Now, LastPush) div 1000, + case Delta > Frequency of + true -> + {Buckets1, [ToPush]} = lists:split(length(Buckets0) - 1, Buckets0), + Buckets2 = [sets:new()|Buckets1], + %% There's no sets:map/2! + sets:fold( + fun(ShardName, _) -> push_shard(ShardName) end, + undefined, + ToPush + ), + {ok, St#state{last_push=Now, buckets=Buckets2}, Frequency}; + false -> + {ok, St, Frequency - Delta} + end. + +push_shard(ShardName) -> + try mem3_shards:for_shard_name(ShardName) of + Shards -> + Live = nodes(), + lists:foreach( + fun(#shard{node=N}) -> + case lists:member(N, Live) of + true -> mem3_sync:push(ShardName, N); + false -> ok + end + end, + Shards + ) + catch error:database_does_not_exist -> + ok + end. + +subscribe_for_config() -> + config:subscribe_for_changes([ + {"mem3", "sync_delay"}, + {"mem3", "sync_frequency"} + ]). + +-ifdef(TEST). +-include_lib("couch/include/couch_eunit.hrl"). + +setup() -> + ok = meck:new(couch_event, [passthrough]), + ok = meck:expect(couch_event, register_all, ['_'], ok), + + ok = meck:new(config_notifier, [passthrough]), + ok = meck:expect(config_notifier, handle_event, [ + {[{'_', '_', "error", '_'}, '_'], meck:raise(throw, raised_error)}, + {['_', '_'], meck:passthrough()} + ]), + + application:start(config), + {ok, Pid} = ?MODULE:start_link(), + erlang:unlink(Pid), + meck:wait(config_notifier, subscribe, '_', 1000), + Pid. + +teardown(Pid) -> + exit(Pid, shutdown), + application:stop(config), + (catch meck:unload(couch_event)), + (catch meck:unload(config_notifier)), + ok. + +subscribe_for_config_test_() -> + { + "Subscrive for configuration changes", + { + foreach, + fun setup/0, fun teardown/1, + [ + fun should_set_sync_delay/1, + fun should_set_sync_frequency/1, + fun should_restart_listener/1, + fun should_terminate/1 + ] + } + }. + +should_set_sync_delay(Pid) -> + ?_test(begin + config:set("mem3", "sync_delay", "123", false), + ?assertMatch(#state{delay = 123}, capture(Pid)), + ok + end). + +should_set_sync_frequency(Pid) -> + ?_test(begin + config:set("mem3", "sync_frequency", "456", false), + ?assertMatch(#state{frequency = 456}, capture(Pid)), + ok + end). + +should_restart_listener(Pid) -> + ?_test(begin + meck:reset(config_notifier), + config:set("mem3", "sync_frequency", "error", false), + + meck:wait(config_notifier, subscribe, '_', 1000), + ok + end). + +should_terminate(Pid) -> + ?_test(begin + ?assert(is_process_alive(Pid)), + + EventMgr = whereis(config_event), + + RestartFun = fun() -> exit(EventMgr, kill) end, + test_util:with_process_restart(config_event, RestartFun), + + ?assertNot(is_process_alive(EventMgr)), + ?assertNot(is_process_alive(Pid)), + ?assert(is_process_alive(whereis(config_event))), + ok + end). + +capture(Pid) -> + Ref = make_ref(), + WaitFun = fun() -> + Pid ! {get_state, Ref, self()}, + receive + {Ref, State} -> State + after 0 -> + wait + end + end, + test_util:wait(WaitFun). + + +-endif. diff --git a/src/mem3/src/mem3_sync_nodes.erl b/src/mem3/src/mem3_sync_nodes.erl new file mode 100644 index 000000000..0a4bffcd2 --- /dev/null +++ b/src/mem3/src/mem3_sync_nodes.erl @@ -0,0 +1,115 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_sync_nodes). +-behaviour(gen_server). +-vsn(1). + + +-export([start_link/0]). +-export([add/1]). + +-export([init/1, terminate/2, code_change/3]). +-export([handle_call/3, handle_cast/2, handle_info/2]). + +-export([monitor_sync/1]). + + +-record(st, { + tid +}). + + +-record(job, { + nodes, + pid, + retry +}). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +add(Nodes) -> + gen_server:cast(?MODULE, {add, Nodes}). + + +init([]) -> + {ok, #st{ + tid = ets:new(?MODULE, [set, protected, {keypos, #job.nodes}]) + }}. + + +terminate(_Reason, St) -> + [exit(Pid, kill) || #job{pid=Pid} <- ets:tab2list(St#st.tid)], + ok. + + +handle_call(Msg, _From, St) -> + {stop, {invalid_call, Msg}, invalid_call, St}. + + +handle_cast({add, Nodes}, #st{tid=Tid}=St) -> + case ets:lookup(Tid, Nodes) of + [] -> + Pid = start_sync(Nodes), + ets:insert(Tid, #job{nodes=Nodes, pid=Pid, retry=false}); + [#job{retry=false}=Job] -> + ets:insert(Tid, Job#job{retry=true}); + _ -> + ok + end, + {noreply, St}; + +handle_cast(Msg, St) -> + {stop, {invalid_cast, Msg}, St}. + + +handle_info({'DOWN', _, _, _, {sync_done, Nodes}}, #st{tid=Tid}=St) -> + case ets:lookup(Tid, Nodes) of + [#job{retry=true}=Job] -> + Pid = start_sync(Nodes), + ets:insert(Tid, Job#job{pid=Pid, retry=false}); + _ -> + ets:delete(Tid, Nodes) + end, + {noreply, St}; + +handle_info({'DOWN', _, _, _, {sync_error, Nodes}}, #st{tid=Tid}=St) -> + Pid = start_sync(Nodes), + ets:insert(Tid, #job{nodes=Nodes, pid=Pid, retry=false}), + {noreply, St}; + +handle_info(Msg, St) -> + {stop, {invalid_info, Msg}, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + + +start_sync(Nodes) -> + {Pid, _} = spawn_monitor(?MODULE, monitor_sync, [Nodes]), + Pid. + + +monitor_sync(Nodes) -> + process_flag(trap_exit, true), + Pid = spawn_link(mem3_sync, initial_sync, [Nodes]), + receive + {'EXIT', Pid, normal} -> + exit({sync_done, Nodes}); + _ -> + exit({sync_error, Nodes}) + end. + diff --git a/src/mem3/src/mem3_sync_security.erl b/src/mem3/src/mem3_sync_security.erl new file mode 100644 index 000000000..9edd0ec57 --- /dev/null +++ b/src/mem3/src/mem3_sync_security.erl @@ -0,0 +1,107 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_sync_security). + +-export([maybe_sync/2, maybe_sync_int/2]). +-export([go/0, go/1]). + +-include_lib("mem3/include/mem3.hrl"). + + +maybe_sync(#shard{}=Src, #shard{}=Dst) -> + case is_local(Src#shard.name) of + false -> + erlang:spawn(?MODULE, maybe_sync_int, [Src, Dst]); + true -> + ok + end. + +maybe_sync_int(#shard{name=Name}=Src, Dst) -> + DbName = mem3:dbname(Name), + case fabric:get_all_security(DbName, [{shards, [Src, Dst]}]) of + {ok, WorkerObjs} -> + Objs = [Obj || {_Worker, Obj} <- WorkerObjs], + case length(lists:usort(Objs)) of + 1 -> ok; + 2 -> go(DbName) + end; + {error, no_majority} -> + go(DbName); + Else -> + Args = [DbName, Else], + couch_log:error("Error checking security objects for ~s :: ~p", Args) + end. + +go() -> + {ok, Dbs} = fabric:all_dbs(), + lists:foreach(fun handle_db/1, Dbs). + +go(DbName) when is_binary(DbName) -> + handle_db(DbName). + +handle_db(DbName) -> + ShardCount = length(mem3:shards(DbName)), + case get_all_security(DbName) of + {ok, SecObjs} -> + case is_ok(SecObjs, ShardCount) of + ok -> + ok; + {fixable, SecObj} -> + couch_log:info("Sync security object for ~p: ~p", [DbName, SecObj]), + case fabric:set_security(DbName, SecObj) of + ok -> ok; + Error -> + couch_log:error("Error setting security object in ~p: ~p", + [DbName, Error]) + end; + broken -> + couch_log:error("Bad security object in ~p: ~p", [DbName, SecObjs]) + end; + Error -> + couch_log:error("Error getting security objects for ~p: ~p", [ + DbName, Error]) + end. + +get_all_security(DbName) -> + case fabric:get_all_security(DbName) of + {ok, SecObjs} -> + SecObjsDict = lists:foldl(fun({_, SO}, Acc) -> + dict:update_counter(SO, 1, Acc) + end, dict:new(), SecObjs), + {ok, dict:to_list(SecObjsDict)}; + Error -> + Error + end. + +is_ok([_], _) -> + % One security object is the happy case + ok; +is_ok([_, _] = SecObjs0, ShardCount) -> + % Figure out if we have a simple majority of security objects + % and if so, use that as the correct value. Otherwise we abort + % and rely on human intervention. + {Count, SecObj} = lists:max([{C, O} || {O, C} <- SecObjs0]), + case Count >= ((ShardCount div 2) + 1) of + true -> {fixable, SecObj}; + false -> broken + end; +is_ok(_, _) -> + % Anything else requires human intervention + broken. + + +is_local(<<"shards/", _/binary>>) -> + false; +is_local(_) -> + true. + diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl new file mode 100644 index 000000000..2cd444d8c --- /dev/null +++ b/src/mem3/src/mem3_util.erl @@ -0,0 +1,253 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_util). + +-export([hash/1, name_shard/2, create_partition_map/5, build_shards/2, + n_val/2, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1, + shard_info/1, ensure_exists/1, open_db_doc/1]). +-export([is_deleted/1, rotate_list/2]). + +%% do not use outside mem3. +-export([build_ordered_shards/2, downcast/1]). + +-export([create_partition_map/4, name_shard/1]). +-deprecated({create_partition_map, 4, eventually}). +-deprecated({name_shard, 1, eventually}). + +-define(RINGTOP, 2 bsl 31). % CRC32 space + +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +hash(Item) when is_binary(Item) -> + erlang:crc32(Item); +hash(Item) -> + erlang:crc32(term_to_binary(Item)). + +name_shard(Shard) -> + name_shard(Shard, ""). + +name_shard(#shard{dbname = DbName, range=Range} = Shard, Suffix) -> + Name = make_name(DbName, Range, Suffix), + Shard#shard{name = ?l2b(Name)}; + +name_shard(#ordered_shard{dbname = DbName, range=Range} = Shard, Suffix) -> + Name = make_name(DbName, Range, Suffix), + Shard#ordered_shard{name = ?l2b(Name)}. + +make_name(DbName, [B,E], Suffix) -> + ["shards/", couch_util:to_hex(<<B:32/integer>>), "-", + couch_util:to_hex(<<E:32/integer>>), "/", DbName, Suffix]. + +create_partition_map(DbName, N, Q, Nodes) -> + create_partition_map(DbName, N, Q, Nodes, ""). + +create_partition_map(DbName, N, Q, Nodes, Suffix) -> + UniqueShards = make_key_ranges((?RINGTOP) div Q, 0, []), + Shards0 = lists:flatten([lists:duplicate(N, S) || S <- UniqueShards]), + Shards1 = attach_nodes(Shards0, [], Nodes, []), + [name_shard(S#shard{dbname=DbName}, Suffix) || S <- Shards1]. + +make_key_ranges(_, CurrentPos, Acc) when CurrentPos >= ?RINGTOP -> + Acc; +make_key_ranges(Increment, Start, Acc) -> + case Start + 2*Increment of + X when X > ?RINGTOP -> + End = ?RINGTOP - 1; + _ -> + End = Start + Increment - 1 + end, + make_key_ranges(Increment, End+1, [#shard{range=[Start, End]} | Acc]). + +attach_nodes([], Acc, _, _) -> + lists:reverse(Acc); +attach_nodes(Shards, Acc, [], UsedNodes) -> + attach_nodes(Shards, Acc, lists:reverse(UsedNodes), []); +attach_nodes([S | Rest], Acc, [Node | Nodes], UsedNodes) -> + attach_nodes(Rest, [S#shard{node=Node} | Acc], Nodes, [Node | UsedNodes]). + +open_db_doc(DocId) -> + DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")), + {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]), + try couch_db:open_doc(Db, DocId, [ejson_body]) after couch_db:close(Db) end. + +write_db_doc(Doc) -> + DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")), + write_db_doc(DbName, Doc, true). + +write_db_doc(DbName, #doc{id=Id, body=Body} = Doc, ShouldMutate) -> + {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]), + try couch_db:open_doc(Db, Id, [ejson_body]) of + {ok, #doc{body = Body}} -> + % the doc is already in the desired state, we're done here + ok; + {not_found, _} when ShouldMutate -> + try couch_db:update_doc(Db, Doc, []) of + {ok, _} -> + ok + catch conflict -> + % check to see if this was a replication race or a different edit + write_db_doc(DbName, Doc, false) + end; + _ -> + % the doc already exists in a different state + conflict + after + couch_db:close(Db) + end. + +delete_db_doc(DocId) -> + gen_server:cast(mem3_shards, {cache_remove, DocId}), + DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")), + delete_db_doc(DbName, DocId, true). + +delete_db_doc(DbName, DocId, ShouldMutate) -> + {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]), + {ok, Revs} = couch_db:open_doc_revs(Db, DocId, all, []), + try [Doc#doc{deleted=true} || {ok, #doc{deleted=false}=Doc} <- Revs] of + [] -> + not_found; + Docs when ShouldMutate -> + try couch_db:update_docs(Db, Docs, []) of + {ok, _} -> + ok + catch conflict -> + % check to see if this was a replication race or if leafs survived + delete_db_doc(DbName, DocId, false) + end; + _ -> + % we have live leafs that we aren't allowed to delete. let's bail + conflict + after + couch_db:close(Db) + end. + +%% Always returns original #shard records. +-spec build_shards(binary(), list()) -> [#shard{}]. +build_shards(DbName, DocProps) -> + build_shards_by_node(DbName, DocProps). + +%% Will return #ordered_shard records if by_node and by_range +%% are symmetrical, #shard records otherwise. +-spec build_ordered_shards(binary(), list()) -> + [#shard{}] | [#ordered_shard{}]. +build_ordered_shards(DbName, DocProps) -> + ByNode = build_shards_by_node(DbName, DocProps), + ByRange = build_shards_by_range(DbName, DocProps), + Symmetrical = lists:sort(ByNode) =:= lists:sort(downcast(ByRange)), + case Symmetrical of + true -> ByRange; + false -> ByNode + end. + +build_shards_by_node(DbName, DocProps) -> + {ByNode} = couch_util:get_value(<<"by_node">>, DocProps, {[]}), + Suffix = couch_util:get_value(<<"shard_suffix">>, DocProps, ""), + lists:flatmap(fun({Node, Ranges}) -> + lists:map(fun(Range) -> + [B,E] = string:tokens(?b2l(Range), "-"), + Beg = httpd_util:hexlist_to_integer(B), + End = httpd_util:hexlist_to_integer(E), + name_shard(#shard{ + dbname = DbName, + node = to_atom(Node), + range = [Beg, End] + }, Suffix) + end, Ranges) + end, ByNode). + +build_shards_by_range(DbName, DocProps) -> + {ByRange} = couch_util:get_value(<<"by_range">>, DocProps, {[]}), + Suffix = couch_util:get_value(<<"shard_suffix">>, DocProps, ""), + lists:flatmap(fun({Range, Nodes}) -> + lists:map(fun({Node, Order}) -> + [B,E] = string:tokens(?b2l(Range), "-"), + Beg = httpd_util:hexlist_to_integer(B), + End = httpd_util:hexlist_to_integer(E), + name_shard(#ordered_shard{ + dbname = DbName, + node = to_atom(Node), + range = [Beg, End], + order = Order + }, Suffix) + end, lists:zip(Nodes, lists:seq(1, length(Nodes)))) + end, ByRange). + +to_atom(Node) when is_binary(Node) -> + list_to_atom(binary_to_list(Node)); +to_atom(Node) when is_atom(Node) -> + Node. + +to_integer(N) when is_integer(N) -> + N; +to_integer(N) when is_binary(N) -> + list_to_integer(binary_to_list(N)); +to_integer(N) when is_list(N) -> + list_to_integer(N). + +n_val(undefined, NodeCount) -> + n_val(config:get("cluster", "n", "3"), NodeCount); +n_val(N, NodeCount) when is_list(N) -> + n_val(list_to_integer(N), NodeCount); +n_val(N, NodeCount) when is_integer(NodeCount), N > NodeCount -> + couch_log:error("Request to create N=~p DB but only ~p node(s)", [N, NodeCount]), + NodeCount; +n_val(N, _) when N < 1 -> + 1; +n_val(N, _) -> + N. + +shard_info(DbName) -> + [{n, mem3:n(DbName)}, + {q, length(mem3:shards(DbName)) div mem3:n(DbName)}]. + +ensure_exists(DbName) when is_list(DbName) -> + ensure_exists(list_to_binary(DbName)); +ensure_exists(DbName) -> + case couch_db:open(DbName, [nologifmissing, sys_db | [?ADMIN_CTX]]) of + {ok, Db} -> + {ok, Db}; + _ -> + couch_server:create(DbName, [?ADMIN_CTX]) + end. + + +is_deleted(Change) -> + case couch_util:get_value(<<"deleted">>, Change) of + undefined -> + % keep backwards compatibility for a while + couch_util:get_value(deleted, Change, false); + Else -> + Else + end. + +rotate_list(_Key, []) -> + []; +rotate_list(Key, List) when not is_binary(Key) -> + rotate_list(term_to_binary(Key), List); +rotate_list(Key, List) -> + {H, T} = lists:split(erlang:crc32(Key) rem length(List), List), + T ++ H. + +downcast(#shard{}=S) -> + S; +downcast(#ordered_shard{}=S) -> + #shard{ + name = S#ordered_shard.name, + node = S#ordered_shard.node, + dbname = S#ordered_shard.dbname, + range = S#ordered_shard.range, + ref = S#ordered_shard.ref + }; +downcast(Shards) when is_list(Shards) -> + [downcast(Shard) || Shard <- Shards]. diff --git a/src/mem3/test/01-config-default.ini b/src/mem3/test/01-config-default.ini new file mode 100644 index 000000000..dde92ce2d --- /dev/null +++ b/src/mem3/test/01-config-default.ini @@ -0,0 +1,14 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +[cluster] +n=3 diff --git a/src/mem3/test/mem3_util_test.erl b/src/mem3/test/mem3_util_test.erl new file mode 100644 index 000000000..340a58a63 --- /dev/null +++ b/src/mem3/test/mem3_util_test.erl @@ -0,0 +1,167 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_util_test). + +-include("mem3.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +hash_test() -> + ?assertEqual(1624516141,mem3_util:hash(0)), + ?assertEqual(3816901808,mem3_util:hash("0")), + ?assertEqual(3523407757,mem3_util:hash(<<0>>)), + ?assertEqual(4108050209,mem3_util:hash(<<"0">>)), + ?assertEqual(3094724072,mem3_util:hash(zero)), + ok. + +name_shard_test() -> + Shard1 = #shard{}, + ?assertError(function_clause, mem3_util:name_shard(Shard1, ".1234")), + + Shard2 = #shard{dbname = <<"testdb">>, range = [0,100]}, + #shard{name=Name2} = mem3_util:name_shard(Shard2, ".1234"), + ?assertEqual(<<"shards/00000000-00000064/testdb.1234">>, Name2), + + ok. + +create_partition_map_test() -> + {DbName1, N1, Q1, Nodes1} = {<<"testdb1">>, 3, 4, [a,b,c,d]}, + Map1 = mem3_util:create_partition_map(DbName1, N1, Q1, Nodes1), + ?assertEqual(12, length(Map1)), + + {DbName2, N2, Q2, Nodes2} = {<<"testdb2">>, 1, 1, [a,b,c,d]}, + [#shard{name=Name2,node=Node2}] = Map2 = + mem3_util:create_partition_map(DbName2, N2, Q2, Nodes2, ".1234"), + ?assertEqual(1, length(Map2)), + ?assertEqual(<<"shards/00000000-ffffffff/testdb2.1234">>, Name2), + ?assertEqual(a, Node2), + ok. + +build_shards_test() -> + DocProps1 = + [{<<"changelog">>, + [[<<"add">>,<<"00000000-1fffffff">>, + <<"bigcouch@node.local">>], + [<<"add">>,<<"20000000-3fffffff">>, + <<"bigcouch@node.local">>], + [<<"add">>,<<"40000000-5fffffff">>, + <<"bigcouch@node.local">>], + [<<"add">>,<<"60000000-7fffffff">>, + <<"bigcouch@node.local">>], + [<<"add">>,<<"80000000-9fffffff">>, + <<"bigcouch@node.local">>], + [<<"add">>,<<"a0000000-bfffffff">>, + <<"bigcouch@node.local">>], + [<<"add">>,<<"c0000000-dfffffff">>, + <<"bigcouch@node.local">>], + [<<"add">>,<<"e0000000-ffffffff">>, + <<"bigcouch@node.local">>]]}, + {<<"by_node">>, + {[{<<"bigcouch@node.local">>, + [<<"00000000-1fffffff">>,<<"20000000-3fffffff">>, + <<"40000000-5fffffff">>,<<"60000000-7fffffff">>, + <<"80000000-9fffffff">>,<<"a0000000-bfffffff">>, + <<"c0000000-dfffffff">>,<<"e0000000-ffffffff">>]}]}}, + {<<"by_range">>, + {[{<<"00000000-1fffffff">>,[<<"bigcouch@node.local">>]}, + {<<"20000000-3fffffff">>,[<<"bigcouch@node.local">>]}, + {<<"40000000-5fffffff">>,[<<"bigcouch@node.local">>]}, + {<<"60000000-7fffffff">>,[<<"bigcouch@node.local">>]}, + {<<"80000000-9fffffff">>,[<<"bigcouch@node.local">>]}, + {<<"a0000000-bfffffff">>,[<<"bigcouch@node.local">>]}, + {<<"c0000000-dfffffff">>,[<<"bigcouch@node.local">>]}, + {<<"e0000000-ffffffff">>,[<<"bigcouch@node.local">>]}]}}], + Shards1 = mem3_util:build_shards(<<"testdb1">>, DocProps1), + ExpectedShards1 = + [{shard,<<"shards/00000000-1fffffff/testdb1">>, + 'bigcouch@node.local',<<"testdb1">>, + [0,536870911], + undefined}, + {shard,<<"shards/20000000-3fffffff/testdb1">>, + 'bigcouch@node.local',<<"testdb1">>, + [536870912,1073741823], + undefined}, + {shard,<<"shards/40000000-5fffffff/testdb1">>, + 'bigcouch@node.local',<<"testdb1">>, + [1073741824,1610612735], + undefined}, + {shard,<<"shards/60000000-7fffffff/testdb1">>, + 'bigcouch@node.local',<<"testdb1">>, + [1610612736,2147483647], + undefined}, + {shard,<<"shards/80000000-9fffffff/testdb1">>, + 'bigcouch@node.local',<<"testdb1">>, + [2147483648,2684354559], + undefined}, + {shard,<<"shards/a0000000-bfffffff/testdb1">>, + 'bigcouch@node.local',<<"testdb1">>, + [2684354560,3221225471], + undefined}, + {shard,<<"shards/c0000000-dfffffff/testdb1">>, + 'bigcouch@node.local',<<"testdb1">>, + [3221225472,3758096383], + undefined}, + {shard,<<"shards/e0000000-ffffffff/testdb1">>, + 'bigcouch@node.local',<<"testdb1">>, + [3758096384,4294967295], + undefined}], + ?assertEqual(ExpectedShards1, Shards1), + ok. + + +%% n_val tests + +nval_test_() -> + {"n_val tests explicit", + [ + {setup, + fun () -> + meck:new([couch_log]), + meck:expect(couch_log, error, fun(_, _) -> ok end), + ok + end, + fun (_) -> meck:unload([couch_log]) end, + [ + ?_assertEqual(2, mem3_util:n_val(2,4)), + ?_assertEqual(1, mem3_util:n_val(-1,4)), + ?_assertEqual(4, mem3_util:n_val(6,4)) + ] + } + ] + }. + + +config_01_setup() -> + Ini = filename:join([code:lib_dir(mem3, test), "01-config-default.ini"]), + {ok, Pid} = config:start_link([Ini]), + Pid. + +config_teardown(_Pid) -> + config:stop(). + + +n_val_test_() -> + {"n_val tests with config", + [ + {setup, + fun config_01_setup/0, + fun config_teardown/1, + fun(Pid) -> + {with, Pid, [ + fun n_val_1/1 + ]} + end} + ] + }. + +n_val_1(_Pid) -> + ?assertEqual(3, mem3_util:n_val(undefined, 4)). |