summaryrefslogtreecommitdiff
path: root/src/rabbit_direct.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_direct.erl')
-rw-r--r--src/rabbit_direct.erl62
1 files changed, 37 insertions, 25 deletions
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index c07ad832..a7ee3276 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -10,8 +10,8 @@
%%
%% The Original Code is RabbitMQ.
%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
%%
-module(rabbit_direct).
@@ -31,16 +31,18 @@
-spec(force_event_refresh/0 :: () -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
--spec(connect/5 :: (rabbit_types:username(), rabbit_types:vhost(),
- rabbit_types:protocol(), pid(),
+-spec(connect/5 :: ((rabbit_types:username() | rabbit_types:user() |
+ {rabbit_types:username(), rabbit_types:password()}),
+ rabbit_types:vhost(), rabbit_types:protocol(), pid(),
rabbit_event:event_props()) ->
- {'ok', {rabbit_types:user(),
- rabbit_framing:amqp_table()}}).
+ rabbit_types:ok_or_error2(
+ {rabbit_types:user(), rabbit_framing:amqp_table()},
+ 'broker_not_found_on_node' | 'auth_failure' |
+ 'access_refused')).
-spec(start_channel/9 ::
(rabbit_channel:channel_number(), pid(), pid(), string(),
rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
rabbit_framing:amqp_table(), pid()) -> {'ok', pid()}).
-
-spec(disconnect/2 :: (pid(), rabbit_event:event_props()) -> 'ok').
-endif.
@@ -60,32 +62,42 @@ list_local() ->
pg_local:get_members(rabbit_direct).
list() ->
- rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(),
+ rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_direct, list_local, []).
%%----------------------------------------------------------------------------
+connect(User = #user{}, VHost, Protocol, Pid, Infos) ->
+ try rabbit_access_control:check_vhost_access(User, VHost) of
+ ok -> ok = pg_local:join(rabbit_direct, Pid),
+ rabbit_event:notify(connection_created, Infos),
+ {ok, {User, rabbit_reader:server_properties(Protocol)}}
+ catch
+ exit:#amqp_error{name = access_refused} ->
+ {error, access_refused}
+ end;
+
+connect({Username, Password}, VHost, Protocol, Pid, Infos) ->
+ connect0(fun () -> rabbit_access_control:check_user_pass_login(
+ Username, Password) end,
+ VHost, Protocol, Pid, Infos);
+
connect(Username, VHost, Protocol, Pid, Infos) ->
+ connect0(fun () -> rabbit_access_control:check_user_login(
+ Username, []) end,
+ VHost, Protocol, Pid, Infos).
+
+connect0(AuthFun, VHost, Protocol, Pid, Infos) ->
case rabbit:is_running() of
- true ->
- case rabbit_access_control:check_user_login(Username, []) of
- {ok, User} ->
- try rabbit_access_control:check_vhost_access(User, VHost) of
- ok -> ok = pg_local:join(rabbit_direct, Pid),
- rabbit_event:notify(connection_created, Infos),
- {ok, {User,
- rabbit_reader:server_properties(Protocol)}}
- catch
- exit:#amqp_error{name = access_refused} ->
- {error, access_refused}
- end;
- {refused, _Msg, _Args} ->
- {error, auth_failure}
- end;
- false ->
- {error, broker_not_found_on_node}
+ true -> case AuthFun() of
+ {ok, User} -> connect(User, VHost, Protocol, Pid,
+ Infos);
+ {refused, _M, _A} -> {error, auth_failure}
+ end;
+ false -> {error, broker_not_found_on_node}
end.
+
start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
VHost, Capabilities, Collector) ->
{ok, _, {ChannelPid, _}} =