diff options
author | Matthias Radestock <matthias@lshift.net> | 2010-02-01 20:23:01 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2010-02-01 20:23:01 +0000 |
commit | 8a47f1821ac5e532fa5c8185107f42e8c8142848 (patch) | |
tree | b8c70cb20c63fa39a9935dc22e8fc6ff5ed2b93c | |
parent | bd5e23e080330d8a4bb0debfa10fd6108c3701f2 (diff) | |
download | rabbitmq-server-8a47f1821ac5e532fa5c8185107f42e8c8142848.tar.gz |
make pg_local do what it is supposed to
-rw-r--r-- | src/pg_local.erl | 315 |
1 files changed, 71 insertions, 244 deletions
diff --git a/src/pg_local.erl b/src/pg_local.erl index 58a3b798..7f771f74 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -1,4 +1,20 @@ +%% This file is a copy of pg2.erl from the R13B-3 Erlang/OTP +%% distribution, with the following modifications: %% +%% 1) Process groups are node-local only. +%% +%% 2) Groups are created/deleted implicitly. +%% +%% 3) 'join' and 'leave' are asynchronous. +%% +%% 4) the type specs of the exported non-callback functions have been +%% extracted into a separate, guarded section, and rewritten in +%% old-style spec syntax, for better compatibility with older +%% versions of Erlang/OTP. The remaining type specs have been +%% removed. + +%% All modifications are (C) 2010 LShift Ltd. + %% %CopyrightBegin% %% %% Copyright Ericsson AB 1997-2009. All Rights Reserved. @@ -18,138 +34,49 @@ %% -module(pg_local). --export([create/1, delete/1, join/2, leave/2]). --export([get_members/1, get_local_members/1]). --export([get_closest_pid/1, which_groups/0]). +-export([join/2, leave/2, get_members/1]). -export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2, terminate/2]). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(name() :: term()). + +-spec(start_link/0 :: () -> {'ok', pid()} | {'error', term()}). +-spec(start/0 :: () -> {'ok', pid()} | {'error', term()}). +-spec(join/2 :: (name(), pid()) -> 'ok'). +-spec(leave/2 :: (name(), pid()) -> 'ok'). +-spec(get_members/1 :: (name()) -> [pid()]). + +-endif. + +%%---------------------------------------------------------------------------- + %%% As of R13B03 monitors are used instead of links. %%% %%% Exported functions %%% --spec start_link() -> {'ok', pid()} | {'error', term()}. - start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). --spec start() -> {'ok', pid()} | {'error', term()}. - start() -> ensure_started(). --spec create(term()) -> 'ok'. - -create(Name) -> - ensure_started(), - case ets:member(pg2_table, {group, Name}) of - false -> - global:trans({{?MODULE, Name}, self()}, - fun() -> - gen_server:multi_call(?MODULE, {create, Name}) - end), - ok; - true -> - ok - end. - --type name() :: term(). - --spec delete(name()) -> 'ok'. - -delete(Name) -> - ensure_started(), - global:trans({{?MODULE, Name}, self()}, - fun() -> - gen_server:multi_call(?MODULE, {delete, Name}) - end), - ok. - --spec join(name(), pid()) -> 'ok' | {'error', {'no_such_group', term()}}. - join(Name, Pid) when is_pid(Pid) -> ensure_started(), - case ets:member(pg2_table, {group, Name}) of - false -> - {error, {no_such_group, Name}}; - true -> - global:trans({{?MODULE, Name}, self()}, - fun() -> - gen_server:multi_call(?MODULE, - {join, Name, Pid}) - end), - ok - end. - --spec leave(name(), pid()) -> 'ok' | {'error', {'no_such_group', name()}}. + gen_server:cast(?MODULE, {join, Name, Pid}). leave(Name, Pid) when is_pid(Pid) -> ensure_started(), - case ets:member(pg2_table, {group, Name}) of - false -> - {error, {no_such_group, Name}}; - true -> - global:trans({{?MODULE, Name}, self()}, - fun() -> - gen_server:multi_call(?MODULE, - {leave, Name, Pid}) - end), - ok - end. - --type get_members_ret() :: [pid()] | {'error', {'no_such_group', name()}}. + gen_server:cast(?MODULE, {leave, Name, Pid}). --spec get_members(name()) -> get_members_ret(). - get_members(Name) -> ensure_started(), - case ets:member(pg2_table, {group, Name}) of - true -> - group_members(Name); - false -> - {error, {no_such_group, Name}} - end. - --spec get_local_members(name()) -> get_members_ret(). - -get_local_members(Name) -> - ensure_started(), - case ets:member(pg2_table, {group, Name}) of - true -> - local_group_members(Name); - false -> - {error, {no_such_group, Name}} - end. - --spec which_groups() -> [name()]. - -which_groups() -> - ensure_started(), - all_groups(). - --type gcp_error_reason() :: {'no_process', term()} | {'no_such_group', term()}. - --spec get_closest_pid(term()) -> pid() | {'error', gcp_error_reason()}. - -get_closest_pid(Name) -> - case get_local_members(Name) of - [Pid] -> - Pid; - [] -> - {_,_,X} = erlang:now(), - case get_members(Name) of - [] -> {error, {no_process, Name}}; - Members -> - lists:nth((X rem length(Members))+1, Members) - end; - Members when is_list(Members) -> - {_,_,X} = erlang:now(), - lists:nth((X rem length(Members))+1, Members); - Else -> - Else - end. + group_members(Name). %%% %%% Callback functions from gen_server @@ -157,162 +84,95 @@ get_closest_pid(Name) -> -record(state, {}). --spec init([]) -> {'ok', #state{}}. - init([]) -> - Ns = nodes(), - net_kernel:monitor_nodes(true), - lists:foreach(fun(N) -> - {?MODULE, N} ! {new_pg2, node()}, - self() ! {nodeup, N} - end, Ns), - pg2_table = ets:new(pg2_table, [ordered_set, protected, named_table]), + pg_local_table = ets:new(pg_local_table, [ordered_set, protected, named_table]), {ok, #state{}}. --type call() :: {'create', name()} - | {'delete', name()} - | {'join', name(), pid()} - | {'leave', name(), pid()}. - --spec handle_call(call(), _, #state{}) -> - {'reply', 'ok', #state{}}. - -handle_call({create, Name}, _From, S) -> - assure_group(Name), - {reply, ok, S}; -handle_call({join, Name, Pid}, _From, S) -> - ets:member(pg2_table, {group, Name}) andalso join_group(Name, Pid), - {reply, ok, S}; -handle_call({leave, Name, Pid}, _From, S) -> - ets:member(pg2_table, {group, Name}) andalso leave_group(Name, Pid), - {reply, ok, S}; -handle_call({delete, Name}, _From, S) -> - delete_group(Name), - {reply, ok, S}; handle_call(Request, From, S) -> - error_logger:warning_msg("The pg2 server received an unexpected message:\n" + error_logger:warning_msg("The pg_local server received an unexpected message:\n" "handle_call(~p, ~p, _)\n", [Request, From]), {noreply, S}. --type all_members() :: [[name(),...]]. --type cast() :: {'exchange', node(), all_members()} - | {'del_member', name(), pid()}. - --spec handle_cast(cast(), #state{}) -> {'noreply', #state{}}. - -handle_cast({exchange, _Node, List}, S) -> - store(List), +handle_cast({join, Name, Pid}, S) -> + join_group(Name, Pid), + {noreply, S}; +handle_cast({leave, Name, Pid}, S) -> + leave_group(Name, Pid), {noreply, S}; handle_cast(_, S) -> - %% Ignore {del_member, Name, Pid}. {noreply, S}. --spec handle_info(tuple(), #state{}) -> {'noreply', #state{}}. - handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) -> member_died(MonitorRef), {noreply, S}; -handle_info({nodeup, Node}, S) -> - gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}), - {noreply, S}; -handle_info({new_pg2, Node}, S) -> - gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}), - {noreply, S}; handle_info(_, S) -> {noreply, S}. --spec terminate(term(), #state{}) -> 'ok'. - terminate(_Reason, _S) -> - true = ets:delete(pg2_table), + true = ets:delete(pg_local_table), ok. %%% %%% Local functions %%% -%%% One ETS table, pg2_table, is used for bookkeeping. The type of the +%%% One ETS table, pg_local_table, is used for bookkeeping. The type of the %%% table is ordered_set, and the fast matching of partially %%% instantiated keys is used extensively. %%% -%%% {{group, Name}} -%%% Process group Name. -%%% {{ref, Pid}, RPid, MonitorRef, Counter} +%%% {{ref, Pid}, MonitorRef, Counter} %%% {{ref, MonitorRef}, Pid} -%%% Each process has one monitor. Sometimes a process is spawned to -%%% monitor the pid (RPid). Counter is incremented when the Pid joins -%%% some group. -%%% {{member, Name, Pid}, GroupCounter} -%%% {{local_member, Name, Pid}} +%%% Each process has one monitor. Counter is incremented when the +%%% Pid joins some group. +%%% {{member, Name, Pid}, _} %%% Pid is a member of group Name, GroupCounter is incremented when the %%% Pid joins the group Name. %%% {{pid, Pid, Name}} %%% Pid is a member of group Name. -store(List) -> - _ = [assure_group(Name) andalso [join_group(Name, P) || P <- Members] || - [Name, Members] <- List], - ok. - -assure_group(Name) -> - Key = {group, Name}, - ets:member(pg2_table, Key) orelse true =:= ets:insert(pg2_table, {Key}). - -delete_group(Name) -> - _ = [leave_group(Name, Pid) || Pid <- group_members(Name)], - true = ets:delete(pg2_table, {group, Name}), - ok. - member_died(Ref) -> - [{{ref, Ref}, Pid}] = ets:lookup(pg2_table, {ref, Ref}), + [{{ref, Ref}, Pid}] = ets:lookup(pg_local_table, {ref, Ref}), Names = member_groups(Pid), _ = [leave_group(Name, P) || Name <- Names, P <- member_in_group(Pid, Name)], - %% Kept for backward compatibility with links. Can be removed, eventually. - _ = [gen_server:abcast(nodes(), ?MODULE, {del_member, Name, Pid}) || - Name <- Names], ok. join_group(Name, Pid) -> Ref_Pid = {ref, Pid}, - try _ = ets:update_counter(pg2_table, Ref_Pid, {4, +1}) + try _ = ets:update_counter(pg_local_table, Ref_Pid, {3, +1}) catch _:_ -> - {RPid, Ref} = do_monitor(Pid), - true = ets:insert(pg2_table, {Ref_Pid, RPid, Ref, 1}), - true = ets:insert(pg2_table, {{ref, Ref}, Pid}) + Ref = erlang:monitor(process, Pid), + true = ets:insert(pg_local_table, {Ref_Pid, Ref, 1}), + true = ets:insert(pg_local_table, {{ref, Ref}, Pid}) end, Member_Name_Pid = {member, Name, Pid}, - try _ = ets:update_counter(pg2_table, Member_Name_Pid, {2, +1}) + try _ = ets:update_counter(pg_local_table, Member_Name_Pid, {2, +1}) catch _:_ -> - true = ets:insert(pg2_table, {Member_Name_Pid, 1}), - _ = [ets:insert(pg2_table, {{local_member, Name, Pid}}) || - node(Pid) =:= node()], - true = ets:insert(pg2_table, {{pid, Pid, Name}}) + true = ets:insert(pg_local_table, {Member_Name_Pid, 1}), + true = ets:insert(pg_local_table, {{pid, Pid, Name}}) end. leave_group(Name, Pid) -> Member_Name_Pid = {member, Name, Pid}, - try ets:update_counter(pg2_table, Member_Name_Pid, {2, -1}) of + try ets:update_counter(pg_local_table, Member_Name_Pid, {2, -1}) of N -> if N =:= 0 -> - true = ets:delete(pg2_table, {pid, Pid, Name}), - _ = [ets:delete(pg2_table, {local_member, Name, Pid}) || - node(Pid) =:= node()], - true = ets:delete(pg2_table, Member_Name_Pid); + true = ets:delete(pg_local_table, {pid, Pid, Name}), + true = ets:delete(pg_local_table, Member_Name_Pid); true -> ok end, Ref_Pid = {ref, Pid}, - case ets:update_counter(pg2_table, Ref_Pid, {4, -1}) of + case ets:update_counter(pg_local_table, Ref_Pid, {3, -1}) of 0 -> - [{Ref_Pid,RPid,Ref,0}] = ets:lookup(pg2_table, Ref_Pid), - true = ets:delete(pg2_table, {ref, Ref}), - true = ets:delete(pg2_table, Ref_Pid), + [{Ref_Pid,Ref,0}] = ets:lookup(pg_local_table, Ref_Pid), + true = ets:delete(pg_local_table, {ref, Ref}), + true = ets:delete(pg_local_table, Ref_Pid), true = erlang:demonitor(Ref, [flush]), - kill_monitor_proc(RPid, Pid); + ok; _ -> ok end @@ -320,57 +180,24 @@ leave_group(Name, Pid) -> ok end. -all_members() -> - [[G, group_members(G)] || G <- all_groups()]. - group_members(Name) -> [P || - [P, N] <- ets:match(pg2_table, {{member, Name, '$1'},'$2'}), + [P, N] <- ets:match(pg_local_table, {{member, Name, '$1'},'$2'}), _ <- lists:seq(1, N)]. -local_group_members(Name) -> - [P || - [Pid] <- ets:match(pg2_table, {{local_member, Name, '$1'}}), - P <- member_in_group(Pid, Name)]. - member_in_group(Pid, Name) -> - [{{member, Name, Pid}, N}] = ets:lookup(pg2_table, {member, Name, Pid}), + [{{member, Name, Pid}, N}] = ets:lookup(pg_local_table, {member, Name, Pid}), lists:duplicate(N, Pid). member_groups(Pid) -> - [Name || [Name] <- ets:match(pg2_table, {{pid, Pid, '$1'}})]. - -all_groups() -> - [N || [N] <- ets:match(pg2_table, {{group,'$1'}})]. + [Name || [Name] <- ets:match(pg_local_table, {{pid, Pid, '$1'}})]. ensure_started() -> case whereis(?MODULE) of undefined -> - C = {pg2, {?MODULE, start_link, []}, permanent, + C = {pg_local, {?MODULE, start_link, []}, permanent, 1000, worker, [?MODULE]}, supervisor:start_child(kernel_safe_sup, C); - Pg2Pid -> - {ok, Pg2Pid} - end. - - -kill_monitor_proc(RPid, Pid) -> - RPid =:= Pid orelse exit(RPid, kill). - -%% When/if erlang:monitor() returns before trying to connect to the -%% other node this function can be removed. -do_monitor(Pid) -> - case (node(Pid) =:= node()) orelse lists:member(node(Pid), nodes()) of - true -> - %% Assume the node is still up - {Pid, erlang:monitor(process, Pid)}; - false -> - F = fun() -> - Ref = erlang:monitor(process, Pid), - receive - {'DOWN', Ref, process, Pid, _Info} -> - exit(normal) - end - end, - erlang:spawn_monitor(F) + PgLocalPid -> + {ok, PgLocalPid} end. |