summaryrefslogtreecommitdiff
path: root/src/mem3/src/mem3_util.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mem3/src/mem3_util.erl')
-rw-r--r--src/mem3/src/mem3_util.erl86
1 files changed, 77 insertions, 9 deletions
diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl
index 3fc9b4f8e..28cb17778 100644
--- a/src/mem3/src/mem3_util.erl
+++ b/src/mem3/src/mem3_util.erl
@@ -14,8 +14,10 @@
-export([name_shard/2, create_partition_map/5, build_shards/2,
n_val/2, q_val/1, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1,
- shard_info/1, ensure_exists/1, open_db_doc/1]).
+ shard_info/1, ensure_exists/1, open_db_doc/1, get_or_create_db/2]).
-export([is_deleted/1, rotate_list/2]).
+-export([get_shard_opts/1, get_engine_opt/1, get_props_opt/1]).
+-export([get_shard_props/1, find_dirty_shards/0]).
-export([
iso8601_timestamp/0,
live_nodes/0,
@@ -87,13 +89,11 @@ attach_nodes([S | Rest], Acc, [Node | Nodes], UsedNodes) ->
attach_nodes(Rest, [S#shard{node=Node} | Acc], Nodes, [Node | UsedNodes]).
open_db_doc(DocId) ->
- DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
- {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
+ {ok, Db} = couch_db:open(mem3_sync:shards_db(), [?ADMIN_CTX]),
try couch_db:open_doc(Db, DocId, [ejson_body]) after couch_db:close(Db) end.
write_db_doc(Doc) ->
- DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
- write_db_doc(DbName, Doc, true).
+ write_db_doc(mem3_sync:shards_db(), Doc, true).
write_db_doc(DbName, #doc{id=Id, body=Body} = Doc, ShouldMutate) ->
{ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
@@ -118,8 +118,7 @@ write_db_doc(DbName, #doc{id=Id, body=Body} = Doc, ShouldMutate) ->
delete_db_doc(DocId) ->
gen_server:cast(mem3_shards, {cache_remove, DocId}),
- DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
- delete_db_doc(DbName, DocId, true).
+ delete_db_doc(mem3_sync:shards_db(), DocId, true).
delete_db_doc(DbName, DocId, ShouldMutate) ->
{ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
@@ -324,7 +323,7 @@ live_nodes() ->
% which could be a while.
%
replicate_dbs_to_all_nodes(Timeout) ->
- DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
+ DbName = mem3_sync:shards_db(),
Targets= mem3_util:live_nodes() -- [node()],
Res = [start_replication(node(), T, DbName, Timeout) || T <- Targets],
collect_replication_results(Res, Timeout).
@@ -335,7 +334,7 @@ replicate_dbs_to_all_nodes(Timeout) ->
% them until they are all done.
%
replicate_dbs_from_all_nodes(Timeout) ->
- DbName = ?l2b(config:get("mem3", "shards_db", "_dbs")),
+ DbName = mem3_sync:shards_db(),
Sources = mem3_util:live_nodes() -- [node()],
Res = [start_replication(S, node(), DbName, Timeout) || S <- Sources],
collect_replication_results(Res, Timeout).
@@ -509,6 +508,75 @@ sort_ranges_fun({B1, _}, {B2, _}) ->
B1 =< B2.
+get_or_create_db(DbName, Options) ->
+ case couch_db:open_int(DbName, Options) of
+ {ok, _} = OkDb ->
+ OkDb;
+ {not_found, no_db_file} ->
+ try
+ DbOpts = case mem3:dbname(DbName) of
+ DbName -> [];
+ MDbName -> mem3_shards:opts_for_db(MDbName)
+ end,
+ Options1 = [{create_if_missing, true} | Options],
+ Options2 = merge_opts(DbOpts, Options1),
+ couch_db:open_int(DbName, Options2)
+ catch error:database_does_not_exist ->
+ throw({error, missing_target})
+ end;
+ Else ->
+ Else
+ end.
+
+
+%% merge two proplists, atom options only valid in Old
+merge_opts(New, Old) ->
+ lists:foldl(fun({Key, Val}, Acc) ->
+ lists:keystore(Key, 1, Acc, {Key, Val})
+ end, Old, New).
+
+
+get_shard_props(ShardName) ->
+ case couch_db:open_int(ShardName, []) of
+ {ok, Db} ->
+ Props = case couch_db_engine:get_props(Db) of
+ undefined -> [];
+ Else -> Else
+ end,
+ %% We don't normally store the default engine name
+ EngineProps = case couch_db_engine:get_engine(Db) of
+ couch_bt_engine ->
+ [];
+ EngineName ->
+ [{engine, EngineName}]
+ end,
+ [{props, Props} | EngineProps];
+ {not_found, _} ->
+ not_found;
+ Else ->
+ Else
+ end.
+
+
+find_dirty_shards() ->
+ mem3_shards:fold(fun(#shard{node=Node, name=Name, opts=Opts}=Shard, Acc) ->
+ case Opts of
+ [] ->
+ Acc;
+ [{props, []}] ->
+ Acc;
+ _ ->
+ Props = rpc:call(Node, ?MODULE, get_shard_props, [Name]),
+ case Props =:= Opts of
+ true ->
+ Acc;
+ false ->
+ [{Shard, Props} | Acc]
+ end
+ end
+ end, []).
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").