summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-08-16 16:14:26 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-08-16 16:14:26 +0100
commitb5880fde7eb1e5680c07af4f7ab054ae197e2c0b (patch)
tree77b6223109286cc429e444404c6afcedb859020a
parent499fc7b0df9ce6d9b97cfbac3b5ce667025e0d5c (diff)
downloadrabbitmq-server-b5880fde7eb1e5680c07af4f7ab054ae197e2c0b.tar.gz
Move the pg_local management into rabbit_direct.
-rw-r--r--src/rabbit.erl1
-rw-r--r--src/rabbit_direct.erl30
2 files changed, 24 insertions, 7 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 5e9c84ef..6a9a0e47 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -514,6 +514,7 @@ log_rotation_result(ok, ok) ->
ok.
force_event_refresh() ->
+ rabbit_direct:force_event_refresh(),
rabbit_networking:force_connection_event_refresh(),
rabbit_channel:force_event_refresh(),
rabbit_amqqueue:force_event_refresh().
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 7ff534ee..6d06d146 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -16,7 +16,8 @@
-module(rabbit_direct).
--export([boot/0, connect/4, start_channel/8, disconnect/1]).
+-export([boot/0, force_event_refresh/0, list/0, list_local/0, connect/5,
+ start_channel/8, disconnect/2]).
-include("rabbit.hrl").
@@ -25,8 +26,12 @@
-ifdef(use_specs).
-spec(boot/0 :: () -> 'ok').
--spec(connect/4 :: (rabbit_types:username(), rabbit_types:vhost(),
- rabbit_types:protocol(), rabbit_event:event_props()) ->
+-spec(force_event_refresh/0 :: () -> 'ok').
+-spec(list/0 :: () -> [pid()]).
+-spec(list_local/0 :: () -> [pid()]).
+-spec(connect/5 :: (rabbit_types:username(), rabbit_types:vhost(),
+ rabbit_types:protocol(), pid(),
+ rabbit_event:event_props()) ->
{'ok', {rabbit_types:user(),
rabbit_framing:amqp_table()}}).
-spec(start_channel/8 ::
@@ -34,7 +39,7 @@
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
pid()) -> {'ok', pid()}).
--spec(disconnect/1 :: (rabbit_event:event_props()) -> 'ok').
+-spec(disconnect/2 :: (pid(), rabbit_event:event_props()) -> 'ok').
-endif.
@@ -51,15 +56,25 @@ boot() ->
transient, infinity, supervisor, [rabbit_client_sup]}),
ok.
+force_event_refresh() ->
+ [Pid ! force_event_refresh || Pid<- list()].
+
+list_local() ->
+ pg_local:get_members(rabbit_direct).
+
+list() ->
+ rabbit_misc:append_rpc_all_nodes(rabbit_direct, list_local, []).
+
%%----------------------------------------------------------------------------
-connect(Username, VHost, Protocol, Infos) ->
+connect(Username, VHost, Protocol, Pid, Infos) ->
case lists:keymember(rabbit, 1, application:which_applications()) of
true ->
case rabbit_access_control:check_user_login(Username, []) of
{ok, User} ->
try rabbit_access_control:check_vhost_access(User, VHost) of
- ok -> rabbit_event:notify(connection_created, Infos),
+ ok -> ok = pg_local:join(rabbit_direct, Pid),
+ rabbit_event:notify(connection_created, Infos),
{ok, {User,
rabbit_reader:server_properties(Protocol)}}
catch
@@ -82,5 +97,6 @@ start_channel(Number, ClientChannelPid, ConnPid, Protocol, User, VHost,
Capabilities, Collector}]),
{ok, ChannelPid}.
-disconnect(Infos) ->
+disconnect(Pid, Infos) ->
+ pg_local:leave(rabbit_direct, Pid),
rabbit_event:notify(connection_closed, Infos).