summaryrefslogtreecommitdiff
path: root/src/rabbit_direct.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_direct.erl')
-rw-r--r--src/rabbit_direct.erl34
1 files changed, 27 insertions, 7 deletions
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 7ff534ee..68afaf5d 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -16,7 +16,10 @@
-module(rabbit_direct).
--export([boot/0, connect/4, start_channel/8, disconnect/1]).
+-export([boot/0, force_event_refresh/0, list/0, connect/5,
+ start_channel/8, disconnect/2]).
+%% Internal
+-export([list_local/0]).
-include("rabbit.hrl").
@@ -25,8 +28,12 @@
-ifdef(use_specs).
-spec(boot/0 :: () -> 'ok').
--spec(connect/4 :: (rabbit_types:username(), rabbit_types:vhost(),
- rabbit_types:protocol(), rabbit_event:event_props()) ->
+-spec(force_event_refresh/0 :: () -> 'ok').
+-spec(list/0 :: () -> [pid()]).
+-spec(list_local/0 :: () -> [pid()]).
+-spec(connect/5 :: (rabbit_types:username(), rabbit_types:vhost(),
+ rabbit_types:protocol(), pid(),
+ rabbit_event:event_props()) ->
{'ok', {rabbit_types:user(),
rabbit_framing:amqp_table()}}).
-spec(start_channel/8 ::
@@ -34,7 +41,7 @@
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
pid()) -> {'ok', pid()}).
--spec(disconnect/1 :: (rabbit_event:event_props()) -> 'ok').
+-spec(disconnect/2 :: (pid(), rabbit_event:event_props()) -> 'ok').
-endif.
@@ -51,15 +58,27 @@ boot() ->
transient, infinity, supervisor, [rabbit_client_sup]}),
ok.
+force_event_refresh() ->
+ [Pid ! force_event_refresh || Pid<- list()],
+ ok.
+
+list_local() ->
+ pg_local:get_members(rabbit_direct).
+
+list() ->
+ rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(),
+ rabbit_direct, list_local, []).
+
%%----------------------------------------------------------------------------
-connect(Username, VHost, Protocol, Infos) ->
+connect(Username, VHost, Protocol, Pid, Infos) ->
case lists:keymember(rabbit, 1, application:which_applications()) of
true ->
case rabbit_access_control:check_user_login(Username, []) of
{ok, User} ->
try rabbit_access_control:check_vhost_access(User, VHost) of
- ok -> rabbit_event:notify(connection_created, Infos),
+ ok -> ok = pg_local:join(rabbit_direct, Pid),
+ rabbit_event:notify(connection_created, Infos),
{ok, {User,
rabbit_reader:server_properties(Protocol)}}
catch
@@ -82,5 +101,6 @@ start_channel(Number, ClientChannelPid, ConnPid, Protocol, User, VHost,
Capabilities, Collector}]),
{ok, ChannelPid}.
-disconnect(Infos) ->
+disconnect(Pid, Infos) ->
+ pg_local:leave(rabbit_direct, Pid),
rabbit_event:notify(connection_closed, Infos).