diff options
author | Ben Hood <0x6e6562@gmail.com> | 2009-02-02 17:28:51 +0000 |
---|---|---|
committer | Ben Hood <0x6e6562@gmail.com> | 2009-02-02 17:28:51 +0000 |
commit | 64e05c746a03b702c150d1eaea9c9c0a24c192bc (patch) | |
tree | 220856349d4137f7baaf1f0e35083370140ec345 /src | |
parent | f1fa48a8b4c978d4cd7053618066f663cba8e4b3 (diff) | |
parent | 99b26135934450b2361d91d6a55efaa2e8c08638 (diff) | |
download | rabbitmq-server-64e05c746a03b702c150d1eaea9c9c0a24c192bc.tar.gz |
Merged bug 20260 into default
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit.erl | 7 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 36 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 59 | ||||
-rw-r--r-- | src/rabbit_control.erl | 6 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 14 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 12 |
6 files changed, 77 insertions, 57 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 7f808bc9..d9a82f0e 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -265,13 +265,14 @@ insert_default_data() -> {ok, DefaultUser} = application:get_env(default_user), {ok, DefaultPass} = application:get_env(default_pass), {ok, DefaultVHost} = application:get_env(default_vhost), - {ok, [DefaultConfigurationPerm, DefaultMessagingPerm]} = + {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} = application:get_env(default_permissions), ok = rabbit_access_control:add_vhost(DefaultVHost), ok = rabbit_access_control:add_user(DefaultUser, DefaultPass), ok = rabbit_access_control:set_permissions(DefaultUser, DefaultVHost, - DefaultConfigurationPerm, - DefaultMessagingPerm), + DefaultConfigurePerm, + DefaultWritePerm, + DefaultReadPerm), ok. start_builtin_amq_applications() -> diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 394eb2b1..da0ab9cf 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -38,7 +38,7 @@ -export([add_user/2, delete_user/1, change_password/2, list_users/0, lookup_user/1]). -export([add_vhost/1, delete_vhost/1, list_vhosts/0]). --export([set_permissions/4, clear_permissions/2, +-export([set_permissions/5, clear_permissions/2, list_vhost_permissions/1, list_user_permissions/1]). %%---------------------------------------------------------------------------- @@ -58,12 +58,13 @@ -spec(add_vhost/1 :: (vhost()) -> 'ok'). -spec(delete_vhost/1 :: (vhost()) -> 'ok'). -spec(list_vhosts/0 :: () -> [vhost()]). --spec(set_permissions/4 :: (username(), vhost(), regexp(), regexp()) -> 'ok'). +-spec(set_permissions/5 :: + (username(), vhost(), regexp(), regexp(), regexp()) -> 'ok'). -spec(clear_permissions/2 :: (username(), vhost()) -> 'ok'). -spec(list_vhost_permissions/1 :: - (vhost()) -> [{username(), regexp(), regexp()}]). + (vhost()) -> [{username(), regexp(), regexp(), regexp()}]). -spec(list_user_permissions/1 :: - (username()) -> [{vhost(), regexp(), regexp()}]). + (username()) -> [{vhost(), regexp(), regexp(), regexp()}]). -endif. @@ -272,7 +273,7 @@ internal_delete_vhost(VHostPath) -> ok = rabbit_exchange:delete(Name, false) end, rabbit_exchange:list(VHostPath)), - lists:foreach(fun ({Username, _, _}) -> + lists:foreach(fun ({Username, _, _, _}) -> ok = clear_permissions(Username, VHostPath) end, list_vhost_permissions(VHostPath)), @@ -289,9 +290,8 @@ validate_regexp(RegexpBin) -> {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}}) end. -set_permissions(Username, VHostPath, ConfigurationPerm, MessagingPerm) -> - validate_regexp(ConfigurationPerm), - validate_regexp(MessagingPerm), +set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> + lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]), rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( Username, VHostPath, @@ -301,8 +301,9 @@ set_permissions(Username, VHostPath, ConfigurationPerm, MessagingPerm) -> username = Username, virtual_host = VHostPath}, permission = #permission{ - configuration = ConfigurationPerm, - messaging = MessagingPerm}}, + configure = ConfigurePerm, + write = WritePerm, + read = ReadPerm}}, write) end)). @@ -317,24 +318,25 @@ clear_permissions(Username, VHostPath) -> end)). list_vhost_permissions(VHostPath) -> - [{Username, ConfigurationPerm, MessagingPerm} || - {Username, _, ConfigurationPerm, MessagingPerm} <- + [{Username, ConfigurePerm, WritePerm, ReadPerm} || + {Username, _, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(rabbit_misc:with_vhost( VHostPath, match_user_vhost('_', VHostPath)))]. list_user_permissions(Username) -> - [{VHostPath, ConfigurationPerm, MessagingPerm} || - {_, VHostPath, ConfigurationPerm, MessagingPerm} <- + [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(rabbit_misc:with_user( Username, match_user_vhost(Username, '_')))]. list_permissions(QueryThunk) -> - [{Username, VHostPath, ConfigurationPerm, MessagingPerm} || + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || #user_permission{user_vhost = #user_vhost{username = Username, virtual_host = VHostPath}, permission = #permission{ - configuration = ConfigurationPerm, - messaging = MessagingPerm}} <- + configure = ConfigurePerm, + write = WritePerm, + read = ReadPerm}} <- %% TODO: use dirty ops instead rabbit_misc:execute_mnesia_transaction(QueryThunk)]. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5a1c0952..192ebacd 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,12 +35,12 @@ -behaviour(gen_server2). --export([start_link/4, do/2, do/3, shutdown/1]). +-export([start_link/5, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --record(ch, {state, reader_pid, writer_pid, limiter_pid, +-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, @@ -54,7 +54,8 @@ -ifdef(use_specs). --spec(start_link/4 :: (pid(), pid(), username(), vhost()) -> pid()). +-spec(start_link/5 :: + (channel_number(), pid(), pid(), username(), vhost()) -> pid()). -spec(do/2 :: (pid(), amqp_method()) -> 'ok'). -spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). @@ -66,9 +67,10 @@ %%---------------------------------------------------------------------------- -start_link(ReaderPid, WriterPid, Username, VHost) -> +start_link(Channel, ReaderPid, WriterPid, Username, VHost) -> {ok, Pid} = gen_server2:start_link( - ?MODULE, [ReaderPid, WriterPid, Username, VHost], []), + ?MODULE, [Channel, ReaderPid, WriterPid, + Username, VHost], []), Pid. do(Pid, Method) -> @@ -91,11 +93,12 @@ conserve_memory(Pid, Conserve) -> %%--------------------------------------------------------------------------- -init([ReaderPid, WriterPid, Username, VHost]) -> +init([Channel, ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), {ok, #ch{state = starting, + channel = Channel, reader_pid = ReaderPid, writer_pid = WriterPid, limiter_pid = undefined, @@ -123,8 +126,11 @@ handle_cast({method, Method, Content}, State) -> {stop, normal, State#ch{state = terminating}} catch exit:{amqp, Error, Explanation, none} -> - {stop, {amqp, Error, Explanation, - rabbit_misc:method_record_type(Method)}, State}; + ok = notify_queues(internal_rollback(State)), + Reason = {amqp, Error, Explanation, + rabbit_misc:method_record_type(Method)}, + State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, + {stop, normal, State#ch{state = terminating}}; exit:normal -> {stop, normal, State}; _:Reason -> @@ -224,11 +230,14 @@ clear_permission_cache() -> erase(permission_cache), ok. -check_configuration_permitted(Resource, #ch{ username = Username}) -> - check_resource_access(Username, Resource, #permission.configuration). +check_configure_permitted(Resource, #ch{ username = Username}) -> + check_resource_access(Username, Resource, #permission.configure). -check_messaging_permitted(Resource, #ch{ username = Username}) -> - check_resource_access(Username, Resource, #permission.messaging). +check_write_permitted(Resource, #ch{ username = Username}) -> + check_resource_access(Username, Resource, #permission.write). + +check_read_permitted(Resource, #ch{ username = Username}) -> + check_resource_access(Username, Resource, #permission.read). expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( @@ -299,7 +308,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, immediate = Immediate}, Content, State = #ch{ virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_messaging_permitted(ExchangeName, State), + check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. @@ -343,7 +352,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, _, State = #ch{ writer_pid = WriterPid, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_messaging_permitted(QueueName, State), + check_read_permitted(QueueName, State), case rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of @@ -378,7 +387,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_messaging_permitted(QueueName, State), + check_read_permitted(QueueName, State), ActualConsumerTag = case ConsumerTag of <<>> -> rabbit_misc:binstring_guid("amq.ctag"); @@ -537,7 +546,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, _, State = #ch{ virtual_host = VHostPath }) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_configuration_permitted(ExchangeName, State), + check_configure_permitted(ExchangeName, State), X = case rabbit_exchange:lookup(ExchangeName) of {ok, FoundX} -> FoundX; {error, not_found} -> @@ -557,7 +566,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, nowait = NoWait}, _, State = #ch{ virtual_host = VHostPath }) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_configuration_permitted(ExchangeName, State), + check_configure_permitted(ExchangeName, State), X = rabbit_exchange:lookup_or_die(ExchangeName), ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)), return_ok(State, NoWait, #'exchange.declare_ok'{}); @@ -567,7 +576,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, nowait = NoWait}, _, State = #ch { virtual_host = VHostPath }) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_configuration_permitted(ExchangeName, State), + check_configure_permitted(ExchangeName, State), case rabbit_exchange:delete(ExchangeName, IfUnused) of {error, not_found} -> rabbit_misc:protocol_error( @@ -618,11 +627,11 @@ handle_method(#'queue.declare'{queue = QueueNameBin, Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - check_configuration_permitted(QueueName, State), + check_configure_permitted(QueueName, State), Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args)); Other = #amqqueue{name = QueueName} -> - check_configuration_permitted(QueueName, State), + check_configure_permitted(QueueName, State), Other end, return_queue_declare_ok(State, NoWait, Q); @@ -632,7 +641,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{ virtual_host = VHostPath }) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), - check_configuration_permitted(QueueName, State), + check_configure_permitted(QueueName, State), Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end), return_queue_declare_ok(State, NoWait, Q); @@ -643,7 +652,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, }, _, State) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_configuration_permitted(QueueName, State), + check_configure_permitted(QueueName, State), case rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of @@ -680,7 +689,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, _, State) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_messaging_permitted(QueueName, State), + check_read_permitted(QueueName, State), {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> rabbit_amqqueue:purge(Q) end), @@ -730,11 +739,11 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, %% FIXME: don't allow binding to internal exchanges - %% including the one named "" ! QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_configuration_permitted(QueueName, State), + check_write_permitted(QueueName, State), ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_configuration_permitted(ExchangeName, State), + check_read_permitted(ExchangeName, State), case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of {error, queue_not_found} -> rabbit_misc:protocol_error( diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 293cd797..e6717d68 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -114,7 +114,7 @@ Available commands: delete_vhost <VHostPath> list_vhosts - set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> + set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp> clear_permissions [-p <VHostPath>] <UserName> list_permissions [-p <VHostPath>] list_user_permissions <UserName> @@ -267,10 +267,10 @@ action(Command, Node, Args, Inform) -> {VHost, RemainingArgs} = parse_vhost_flag(Args), action(Command, Node, VHost, RemainingArgs, Inform). -action(set_permissions, Node, VHost, [Username, CPerm, MPerm], Inform) -> +action(set_permissions, Node, VHost, [Username, CPerm, WPerm, RPerm], Inform) -> Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), call(Node, {rabbit_access_control, set_permissions, - [Username, VHost, CPerm, MPerm]}); + [Username, VHost, CPerm, WPerm, RPerm]}); action(clear_permissions, Node, VHost, [Username], Inform) -> Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 9dbc49df..12ee299e 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -284,6 +284,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> exit(Reason); {'EXIT', _Pid, E = {writer, send_failed, _Error}} -> throw(E); + {channel_exit, Channel, Reason} -> + mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); {'EXIT', Pid, Reason} -> mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State)); {terminate_channel, Channel, Ref1} -> @@ -351,6 +353,14 @@ terminate_channel(Channel, Ref, State) -> end, State. +handle_channel_exit(Channel, Reason, State) -> + %% We remove the channel from the inbound map only. That allows + %% the channel to be re-opened, but also means the remaining + %% cleanup, including possibly closing the connection, is deferred + %% until we get the (normal) exit signal. + erase({channel, Channel}), + handle_exception(State, Channel, Reason). + handle_dependent_exit(Pid, normal, State) -> channel_cleanup(Pid), maybe_close(State); @@ -711,8 +721,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> vhost = VHost}} = State, WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/4, - [self(), WriterPid, Username, VHost]), + fun rabbit_channel:start_link/5, + [Channel, self(), WriterPid, Username, VHost]), put({channel, Channel}, {chpid, ChPid}), put({chpid, ChPid}, {channel, Channel}), ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame); diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ef390e4d..6312e8e3 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -444,7 +444,7 @@ test_user_management() -> {error, {no_such_vhost, _}} = control_action(delete_vhost, ["/testhost"]), {error, {no_such_user, _}} = - control_action(set_permissions, ["foo", ".*", ".*"]), + control_action(set_permissions, ["foo", ".*", ".*", ".*"]), {error, {no_such_user, _}} = control_action(clear_permissions, ["foo"]), {error, {no_such_user, _}} = @@ -452,9 +452,7 @@ test_user_management() -> {error, {no_such_vhost, _}} = control_action(list_permissions, ["-p", "/testhost"]), {error, {invalid_regexp, _, _}} = - control_action(set_permissions, ["guest", "+foo", ".*"]), - {error, {invalid_regexp, _, _}} = - control_action(set_permissions, ["guest", ".*", "+foo"]), + control_action(set_permissions, ["guest", "+foo", ".*", ".*"]), %% user creation ok = control_action(add_user, ["foo", "bar"]), @@ -471,9 +469,9 @@ test_user_management() -> %% user/vhost mapping ok = control_action(set_permissions, ["-p", "/testhost", - "foo", ".*", ".*"]), + "foo", ".*", ".*", ".*"]), ok = control_action(set_permissions, ["-p", "/testhost", - "foo", ".*", ".*"]), + "foo", ".*", ".*", ".*"]), ok = control_action(list_permissions, ["-p", "/testhost"]), ok = control_action(list_user_permissions, ["foo"]), @@ -489,7 +487,7 @@ test_user_management() -> %% deleting a populated vhost ok = control_action(add_vhost, ["/testhost"]), ok = control_action(set_permissions, ["-p", "/testhost", - "foo", ".*", ".*"]), + "foo", ".*", ".*", ".*"]), ok = control_action(delete_vhost, ["/testhost"]), %% user deletion |