diff options
Diffstat (limited to 'src/rabbit_direct.erl')
-rw-r--r-- | src/rabbit_direct.erl | 34 |
1 files changed, 27 insertions, 7 deletions
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 7ff534ee..68afaf5d 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -16,7 +16,10 @@ -module(rabbit_direct). --export([boot/0, connect/4, start_channel/8, disconnect/1]). +-export([boot/0, force_event_refresh/0, list/0, connect/5, + start_channel/8, disconnect/2]). +%% Internal +-export([list_local/0]). -include("rabbit.hrl"). @@ -25,8 +28,12 @@ -ifdef(use_specs). -spec(boot/0 :: () -> 'ok'). --spec(connect/4 :: (rabbit_types:username(), rabbit_types:vhost(), - rabbit_types:protocol(), rabbit_event:event_props()) -> +-spec(force_event_refresh/0 :: () -> 'ok'). +-spec(list/0 :: () -> [pid()]). +-spec(list_local/0 :: () -> [pid()]). +-spec(connect/5 :: (rabbit_types:username(), rabbit_types:vhost(), + rabbit_types:protocol(), pid(), + rabbit_event:event_props()) -> {'ok', {rabbit_types:user(), rabbit_framing:amqp_table()}}). -spec(start_channel/8 :: @@ -34,7 +41,7 @@ rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()) -> {'ok', pid()}). --spec(disconnect/1 :: (rabbit_event:event_props()) -> 'ok'). +-spec(disconnect/2 :: (pid(), rabbit_event:event_props()) -> 'ok'). -endif. @@ -51,15 +58,27 @@ boot() -> transient, infinity, supervisor, [rabbit_client_sup]}), ok. +force_event_refresh() -> + [Pid ! force_event_refresh || Pid<- list()], + ok. + +list_local() -> + pg_local:get_members(rabbit_direct). + +list() -> + rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(), + rabbit_direct, list_local, []). + %%---------------------------------------------------------------------------- -connect(Username, VHost, Protocol, Infos) -> +connect(Username, VHost, Protocol, Pid, Infos) -> case lists:keymember(rabbit, 1, application:which_applications()) of true -> case rabbit_access_control:check_user_login(Username, []) of {ok, User} -> try rabbit_access_control:check_vhost_access(User, VHost) of - ok -> rabbit_event:notify(connection_created, Infos), + ok -> ok = pg_local:join(rabbit_direct, Pid), + rabbit_event:notify(connection_created, Infos), {ok, {User, rabbit_reader:server_properties(Protocol)}} catch @@ -82,5 +101,6 @@ start_channel(Number, ClientChannelPid, ConnPid, Protocol, User, VHost, Capabilities, Collector}]), {ok, ChannelPid}. -disconnect(Infos) -> +disconnect(Pid, Infos) -> + pg_local:leave(rabbit_direct, Pid), rabbit_event:notify(connection_closed, Infos). |