summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-02-01 20:23:01 +0000
committerMatthias Radestock <matthias@lshift.net>2010-02-01 20:23:01 +0000
commit8a47f1821ac5e532fa5c8185107f42e8c8142848 (patch)
treeb8c70cb20c63fa39a9935dc22e8fc6ff5ed2b93c
parentbd5e23e080330d8a4bb0debfa10fd6108c3701f2 (diff)
downloadrabbitmq-server-8a47f1821ac5e532fa5c8185107f42e8c8142848.tar.gz
make pg_local do what it is supposed to
-rw-r--r--src/pg_local.erl315
1 files changed, 71 insertions, 244 deletions
diff --git a/src/pg_local.erl b/src/pg_local.erl
index 58a3b798..7f771f74 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -1,4 +1,20 @@
+%% This file is a copy of pg2.erl from the R13B-3 Erlang/OTP
+%% distribution, with the following modifications:
%%
+%% 1) Process groups are node-local only.
+%%
+%% 2) Groups are created/deleted implicitly.
+%%
+%% 3) 'join' and 'leave' are asynchronous.
+%%
+%% 4) the type specs of the exported non-callback functions have been
+%% extracted into a separate, guarded section, and rewritten in
+%% old-style spec syntax, for better compatibility with older
+%% versions of Erlang/OTP. The remaining type specs have been
+%% removed.
+
+%% All modifications are (C) 2010 LShift Ltd.
+
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1997-2009. All Rights Reserved.
@@ -18,138 +34,49 @@
%%
-module(pg_local).
--export([create/1, delete/1, join/2, leave/2]).
--export([get_members/1, get_local_members/1]).
--export([get_closest_pid/1, which_groups/0]).
+-export([join/2, leave/2, get_members/1]).
-export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2,
terminate/2]).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(name() :: term()).
+
+-spec(start_link/0 :: () -> {'ok', pid()} | {'error', term()}).
+-spec(start/0 :: () -> {'ok', pid()} | {'error', term()}).
+-spec(join/2 :: (name(), pid()) -> 'ok').
+-spec(leave/2 :: (name(), pid()) -> 'ok').
+-spec(get_members/1 :: (name()) -> [pid()]).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
%%% As of R13B03 monitors are used instead of links.
%%%
%%% Exported functions
%%%
--spec start_link() -> {'ok', pid()} | {'error', term()}.
-
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
--spec start() -> {'ok', pid()} | {'error', term()}.
-
start() ->
ensure_started().
--spec create(term()) -> 'ok'.
-
-create(Name) ->
- ensure_started(),
- case ets:member(pg2_table, {group, Name}) of
- false ->
- global:trans({{?MODULE, Name}, self()},
- fun() ->
- gen_server:multi_call(?MODULE, {create, Name})
- end),
- ok;
- true ->
- ok
- end.
-
--type name() :: term().
-
--spec delete(name()) -> 'ok'.
-
-delete(Name) ->
- ensure_started(),
- global:trans({{?MODULE, Name}, self()},
- fun() ->
- gen_server:multi_call(?MODULE, {delete, Name})
- end),
- ok.
-
--spec join(name(), pid()) -> 'ok' | {'error', {'no_such_group', term()}}.
-
join(Name, Pid) when is_pid(Pid) ->
ensure_started(),
- case ets:member(pg2_table, {group, Name}) of
- false ->
- {error, {no_such_group, Name}};
- true ->
- global:trans({{?MODULE, Name}, self()},
- fun() ->
- gen_server:multi_call(?MODULE,
- {join, Name, Pid})
- end),
- ok
- end.
-
--spec leave(name(), pid()) -> 'ok' | {'error', {'no_such_group', name()}}.
+ gen_server:cast(?MODULE, {join, Name, Pid}).
leave(Name, Pid) when is_pid(Pid) ->
ensure_started(),
- case ets:member(pg2_table, {group, Name}) of
- false ->
- {error, {no_such_group, Name}};
- true ->
- global:trans({{?MODULE, Name}, self()},
- fun() ->
- gen_server:multi_call(?MODULE,
- {leave, Name, Pid})
- end),
- ok
- end.
-
--type get_members_ret() :: [pid()] | {'error', {'no_such_group', name()}}.
+ gen_server:cast(?MODULE, {leave, Name, Pid}).
--spec get_members(name()) -> get_members_ret().
-
get_members(Name) ->
ensure_started(),
- case ets:member(pg2_table, {group, Name}) of
- true ->
- group_members(Name);
- false ->
- {error, {no_such_group, Name}}
- end.
-
--spec get_local_members(name()) -> get_members_ret().
-
-get_local_members(Name) ->
- ensure_started(),
- case ets:member(pg2_table, {group, Name}) of
- true ->
- local_group_members(Name);
- false ->
- {error, {no_such_group, Name}}
- end.
-
--spec which_groups() -> [name()].
-
-which_groups() ->
- ensure_started(),
- all_groups().
-
--type gcp_error_reason() :: {'no_process', term()} | {'no_such_group', term()}.
-
--spec get_closest_pid(term()) -> pid() | {'error', gcp_error_reason()}.
-
-get_closest_pid(Name) ->
- case get_local_members(Name) of
- [Pid] ->
- Pid;
- [] ->
- {_,_,X} = erlang:now(),
- case get_members(Name) of
- [] -> {error, {no_process, Name}};
- Members ->
- lists:nth((X rem length(Members))+1, Members)
- end;
- Members when is_list(Members) ->
- {_,_,X} = erlang:now(),
- lists:nth((X rem length(Members))+1, Members);
- Else ->
- Else
- end.
+ group_members(Name).
%%%
%%% Callback functions from gen_server
@@ -157,162 +84,95 @@ get_closest_pid(Name) ->
-record(state, {}).
--spec init([]) -> {'ok', #state{}}.
-
init([]) ->
- Ns = nodes(),
- net_kernel:monitor_nodes(true),
- lists:foreach(fun(N) ->
- {?MODULE, N} ! {new_pg2, node()},
- self() ! {nodeup, N}
- end, Ns),
- pg2_table = ets:new(pg2_table, [ordered_set, protected, named_table]),
+ pg_local_table = ets:new(pg_local_table, [ordered_set, protected, named_table]),
{ok, #state{}}.
--type call() :: {'create', name()}
- | {'delete', name()}
- | {'join', name(), pid()}
- | {'leave', name(), pid()}.
-
--spec handle_call(call(), _, #state{}) ->
- {'reply', 'ok', #state{}}.
-
-handle_call({create, Name}, _From, S) ->
- assure_group(Name),
- {reply, ok, S};
-handle_call({join, Name, Pid}, _From, S) ->
- ets:member(pg2_table, {group, Name}) andalso join_group(Name, Pid),
- {reply, ok, S};
-handle_call({leave, Name, Pid}, _From, S) ->
- ets:member(pg2_table, {group, Name}) andalso leave_group(Name, Pid),
- {reply, ok, S};
-handle_call({delete, Name}, _From, S) ->
- delete_group(Name),
- {reply, ok, S};
handle_call(Request, From, S) ->
- error_logger:warning_msg("The pg2 server received an unexpected message:\n"
+ error_logger:warning_msg("The pg_local server received an unexpected message:\n"
"handle_call(~p, ~p, _)\n",
[Request, From]),
{noreply, S}.
--type all_members() :: [[name(),...]].
--type cast() :: {'exchange', node(), all_members()}
- | {'del_member', name(), pid()}.
-
--spec handle_cast(cast(), #state{}) -> {'noreply', #state{}}.
-
-handle_cast({exchange, _Node, List}, S) ->
- store(List),
+handle_cast({join, Name, Pid}, S) ->
+ join_group(Name, Pid),
+ {noreply, S};
+handle_cast({leave, Name, Pid}, S) ->
+ leave_group(Name, Pid),
{noreply, S};
handle_cast(_, S) ->
- %% Ignore {del_member, Name, Pid}.
{noreply, S}.
--spec handle_info(tuple(), #state{}) -> {'noreply', #state{}}.
-
handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) ->
member_died(MonitorRef),
{noreply, S};
-handle_info({nodeup, Node}, S) ->
- gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
- {noreply, S};
-handle_info({new_pg2, Node}, S) ->
- gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
- {noreply, S};
handle_info(_, S) ->
{noreply, S}.
--spec terminate(term(), #state{}) -> 'ok'.
-
terminate(_Reason, _S) ->
- true = ets:delete(pg2_table),
+ true = ets:delete(pg_local_table),
ok.
%%%
%%% Local functions
%%%
-%%% One ETS table, pg2_table, is used for bookkeeping. The type of the
+%%% One ETS table, pg_local_table, is used for bookkeeping. The type of the
%%% table is ordered_set, and the fast matching of partially
%%% instantiated keys is used extensively.
%%%
-%%% {{group, Name}}
-%%% Process group Name.
-%%% {{ref, Pid}, RPid, MonitorRef, Counter}
+%%% {{ref, Pid}, MonitorRef, Counter}
%%% {{ref, MonitorRef}, Pid}
-%%% Each process has one monitor. Sometimes a process is spawned to
-%%% monitor the pid (RPid). Counter is incremented when the Pid joins
-%%% some group.
-%%% {{member, Name, Pid}, GroupCounter}
-%%% {{local_member, Name, Pid}}
+%%% Each process has one monitor. Counter is incremented when the
+%%% Pid joins some group.
+%%% {{member, Name, Pid}, _}
%%% Pid is a member of group Name, GroupCounter is incremented when the
%%% Pid joins the group Name.
%%% {{pid, Pid, Name}}
%%% Pid is a member of group Name.
-store(List) ->
- _ = [assure_group(Name) andalso [join_group(Name, P) || P <- Members] ||
- [Name, Members] <- List],
- ok.
-
-assure_group(Name) ->
- Key = {group, Name},
- ets:member(pg2_table, Key) orelse true =:= ets:insert(pg2_table, {Key}).
-
-delete_group(Name) ->
- _ = [leave_group(Name, Pid) || Pid <- group_members(Name)],
- true = ets:delete(pg2_table, {group, Name}),
- ok.
-
member_died(Ref) ->
- [{{ref, Ref}, Pid}] = ets:lookup(pg2_table, {ref, Ref}),
+ [{{ref, Ref}, Pid}] = ets:lookup(pg_local_table, {ref, Ref}),
Names = member_groups(Pid),
_ = [leave_group(Name, P) ||
Name <- Names,
P <- member_in_group(Pid, Name)],
- %% Kept for backward compatibility with links. Can be removed, eventually.
- _ = [gen_server:abcast(nodes(), ?MODULE, {del_member, Name, Pid}) ||
- Name <- Names],
ok.
join_group(Name, Pid) ->
Ref_Pid = {ref, Pid},
- try _ = ets:update_counter(pg2_table, Ref_Pid, {4, +1})
+ try _ = ets:update_counter(pg_local_table, Ref_Pid, {3, +1})
catch _:_ ->
- {RPid, Ref} = do_monitor(Pid),
- true = ets:insert(pg2_table, {Ref_Pid, RPid, Ref, 1}),
- true = ets:insert(pg2_table, {{ref, Ref}, Pid})
+ Ref = erlang:monitor(process, Pid),
+ true = ets:insert(pg_local_table, {Ref_Pid, Ref, 1}),
+ true = ets:insert(pg_local_table, {{ref, Ref}, Pid})
end,
Member_Name_Pid = {member, Name, Pid},
- try _ = ets:update_counter(pg2_table, Member_Name_Pid, {2, +1})
+ try _ = ets:update_counter(pg_local_table, Member_Name_Pid, {2, +1})
catch _:_ ->
- true = ets:insert(pg2_table, {Member_Name_Pid, 1}),
- _ = [ets:insert(pg2_table, {{local_member, Name, Pid}}) ||
- node(Pid) =:= node()],
- true = ets:insert(pg2_table, {{pid, Pid, Name}})
+ true = ets:insert(pg_local_table, {Member_Name_Pid, 1}),
+ true = ets:insert(pg_local_table, {{pid, Pid, Name}})
end.
leave_group(Name, Pid) ->
Member_Name_Pid = {member, Name, Pid},
- try ets:update_counter(pg2_table, Member_Name_Pid, {2, -1}) of
+ try ets:update_counter(pg_local_table, Member_Name_Pid, {2, -1}) of
N ->
if
N =:= 0 ->
- true = ets:delete(pg2_table, {pid, Pid, Name}),
- _ = [ets:delete(pg2_table, {local_member, Name, Pid}) ||
- node(Pid) =:= node()],
- true = ets:delete(pg2_table, Member_Name_Pid);
+ true = ets:delete(pg_local_table, {pid, Pid, Name}),
+ true = ets:delete(pg_local_table, Member_Name_Pid);
true ->
ok
end,
Ref_Pid = {ref, Pid},
- case ets:update_counter(pg2_table, Ref_Pid, {4, -1}) of
+ case ets:update_counter(pg_local_table, Ref_Pid, {3, -1}) of
0 ->
- [{Ref_Pid,RPid,Ref,0}] = ets:lookup(pg2_table, Ref_Pid),
- true = ets:delete(pg2_table, {ref, Ref}),
- true = ets:delete(pg2_table, Ref_Pid),
+ [{Ref_Pid,Ref,0}] = ets:lookup(pg_local_table, Ref_Pid),
+ true = ets:delete(pg_local_table, {ref, Ref}),
+ true = ets:delete(pg_local_table, Ref_Pid),
true = erlang:demonitor(Ref, [flush]),
- kill_monitor_proc(RPid, Pid);
+ ok;
_ ->
ok
end
@@ -320,57 +180,24 @@ leave_group(Name, Pid) ->
ok
end.
-all_members() ->
- [[G, group_members(G)] || G <- all_groups()].
-
group_members(Name) ->
[P ||
- [P, N] <- ets:match(pg2_table, {{member, Name, '$1'},'$2'}),
+ [P, N] <- ets:match(pg_local_table, {{member, Name, '$1'},'$2'}),
_ <- lists:seq(1, N)].
-local_group_members(Name) ->
- [P ||
- [Pid] <- ets:match(pg2_table, {{local_member, Name, '$1'}}),
- P <- member_in_group(Pid, Name)].
-
member_in_group(Pid, Name) ->
- [{{member, Name, Pid}, N}] = ets:lookup(pg2_table, {member, Name, Pid}),
+ [{{member, Name, Pid}, N}] = ets:lookup(pg_local_table, {member, Name, Pid}),
lists:duplicate(N, Pid).
member_groups(Pid) ->
- [Name || [Name] <- ets:match(pg2_table, {{pid, Pid, '$1'}})].
-
-all_groups() ->
- [N || [N] <- ets:match(pg2_table, {{group,'$1'}})].
+ [Name || [Name] <- ets:match(pg_local_table, {{pid, Pid, '$1'}})].
ensure_started() ->
case whereis(?MODULE) of
undefined ->
- C = {pg2, {?MODULE, start_link, []}, permanent,
+ C = {pg_local, {?MODULE, start_link, []}, permanent,
1000, worker, [?MODULE]},
supervisor:start_child(kernel_safe_sup, C);
- Pg2Pid ->
- {ok, Pg2Pid}
- end.
-
-
-kill_monitor_proc(RPid, Pid) ->
- RPid =:= Pid orelse exit(RPid, kill).
-
-%% When/if erlang:monitor() returns before trying to connect to the
-%% other node this function can be removed.
-do_monitor(Pid) ->
- case (node(Pid) =:= node()) orelse lists:member(node(Pid), nodes()) of
- true ->
- %% Assume the node is still up
- {Pid, erlang:monitor(process, Pid)};
- false ->
- F = fun() ->
- Ref = erlang:monitor(process, Pid),
- receive
- {'DOWN', Ref, process, Pid, _Info} ->
- exit(normal)
- end
- end,
- erlang:spawn_monitor(F)
+ PgLocalPid ->
+ {ok, PgLocalPid}
end.