diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-12-20 17:33:51 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-12-20 17:33:51 +0000 |
commit | a895553bfdc0cb380b72725538a742c33dd39612 (patch) | |
tree | 0cadc9082cf40e9aab561502814686a2e9e0e02d | |
parent | b660430fc3a4f535090e5ee833b63af9e6508f17 (diff) | |
parent | 31ec612035ab84532f5fbbaa575ca7b3ecb6e1d1 (diff) | |
download | rabbitmq-server-a895553bfdc0cb380b72725538a742c33dd39612.tar.gz |
Merge from default
-rw-r--r-- | docs/rabbitmqctl.1.xml | 4 | ||||
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/delegate.erl | 8 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 10 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 50 | ||||
-rw-r--r-- | src/rabbit_auth_mechanism_plain.erl | 11 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 13 | ||||
-rw-r--r-- | src/rabbit_error_logger.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 29 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 24 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 14 |
12 files changed, 120 insertions, 49 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 35de1cea..2419a54b 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -866,6 +866,10 @@ <listitem><para>Whether the exchange will be deleted automatically when no longer used.</para></listitem> </varlistentry> <varlistentry> + <term>internal</term> + <listitem><para>Whether the exchange is internal, i.e. cannot be directly published to by a client.</para></listitem> + </varlistentry> + <varlistentry> <term>arguments</term> <listitem><para>Exchange arguments.</para></listitem> </varlistentry> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index e5fceaed..81c3996b 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -57,7 +57,7 @@ -record(resource, {virtual_host, kind, name}). --record(exchange, {name, type, durable, auto_delete, arguments}). +-record(exchange, {name, type, durable, auto_delete, internal, arguments}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid}). diff --git a/src/delegate.erl b/src/delegate.erl index 11abe73b..8e64f3d0 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -31,6 +31,14 @@ -module(delegate). +%% The reason we have local delegate processes is because we want to +%% be able to issue calls to remote nodes in parallel. This requires +%% segmenting the destination Pids by node, and then getting local +%% delegates to issue calls/casts to the remote delegates in +%% parallel. In order to ensure consistent ordering, even casts to +%% remote Pids have to go through the local delegates rather than be +%% sent directly. + -define(DELEGATE_PROCESS_COUNT_MULTIPLIER, 2). -behaviour(gen_server2). diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 4911fa08..023b58fd 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -306,7 +306,7 @@ add_vhost(VHostPath) -> write), [rabbit_exchange:declare( rabbit_misc:r(VHostPath, exchange, Name), - Type, true, false, []) || + Type, true, false, false, []) || {Name,Type} <- [{<<"">>, direct}, {<<"amq.direct">>, direct}, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 71fd7a17..e3e89211 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -503,15 +503,13 @@ pseudo_queue(QueueName, Pid) -> pid = Pid}. safe_delegate_call_ok(F, Pids) -> - {_, Bad} = delegate:invoke(Pids, - fun (Pid) -> + case delegate:invoke(Pids, fun (Pid) -> rabbit_misc:with_exit_handler( fun () -> ok end, fun () -> F(Pid) end) - end), - case Bad of - [] -> ok; - _ -> {error, Bad} + end) of + {_, []} -> ok; + {_, Bad} -> {error, Bad} end. delegate_call(Pid, Msg, Timeout) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e559b9c0..981dd31d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -199,6 +199,8 @@ terminate_shutdown(Fun, State) -> BQ:tx_rollback(Txn, BQSN), BQSN1 end, BQS, all_ch_record()), + [emit_consumer_deleted(Ch, CTag) + || {Ch, CTag, _} <- consumers(State1)], rabbit_event:notify(queue_deleted, [{pid, self()}]), State1#q{backing_queue_state = Fun(BQS1)} end. @@ -536,12 +538,19 @@ remove_consumer(ChPid, ConsumerTag, Queue) -> end, Queue). remove_consumers(ChPid, Queue) -> - queue:filter(fun ({CP, _}) -> CP /= ChPid end, Queue). + {Kept, Removed} = split_by_channel(ChPid, Queue), + [emit_consumer_deleted(Ch, CTag) || + {Ch, #consumer{tag = CTag}} <- queue:to_list(Removed)], + Kept. move_consumers(ChPid, From, To) -> + {Kept, Removed} = split_by_channel(ChPid, From), + {Kept, queue:join(To, Removed)}. + +split_by_channel(ChPid, Queue) -> {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end, - queue:to_list(From)), - {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}. + queue:to_list(Queue)), + {queue:from_list(Kept), queue:from_list(Removed)}. possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of @@ -721,12 +730,34 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> i(Item, _) -> throw({bad_argument, Item}). +consumers(#q{active_consumers = ActiveConsumers, + blocked_consumers = BlockedConsumers}) -> + rabbit_misc:queue_fold( + fun ({ChPid, #consumer{tag = ConsumerTag, + ack_required = AckRequired}}, Acc) -> + [{ChPid, ConsumerTag, AckRequired} | Acc] + end, [], queue:join(ActiveConsumers, BlockedConsumers)). + emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). +emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) -> + rabbit_event:notify(consumer_created, + [{consumer_tag, ConsumerTag}, + {exclusive, Exclusive}, + {ack_required, AckRequired}, + {channel, ChPid}, + {queue, self()}]). + +emit_consumer_deleted(ChPid, ConsumerTag) -> + rabbit_event:notify(consumer_deleted, + [{consumer_tag, ConsumerTag}, + {channel, ChPid}, + {queue, self()}]). + %--------------------------------------------------------------------------- prioritise_call(Msg, _From, _State) -> @@ -789,14 +820,8 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call(consumers, _From, - State = #q{active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers}) -> - reply(rabbit_misc:queue_fold( - fun ({ChPid, #consumer{tag = ConsumerTag, - ack_required = AckRequired}}, Acc) -> - [{ChPid, ConsumerTag, AckRequired} | Acc] - end, [], queue:join(ActiveConsumers, BlockedConsumers)), State); +handle_call(consumers, _From, State) -> + reply(consumers(State), State); handle_call({deliver_immediately, Delivery = #delivery{message = Message}}, _From, State) -> @@ -899,6 +924,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, ChPid, Consumer, State1#q.active_consumers)}) end, + emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + not NoAck), reply(ok, State2) end; @@ -917,6 +944,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, C1#cr{limiter_pid = undefined}; _ -> C1 end), + emit_consumer_deleted(ChPid, ConsumerTag), ok = maybe_send_reply(ChPid, OkMsg), NewState = State#q{exclusive_consumer = cancel_holder(ChPid, diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl index 8758f85f..e5f8f3e6 100644 --- a/src/rabbit_auth_mechanism_plain.erl +++ b/src/rabbit_auth_mechanism_plain.erl @@ -56,6 +56,11 @@ init(_Sock) -> []. handle_response(Response, _State) -> - [User, Pass] = [list_to_binary(T) || - T <- string:tokens(binary_to_list(Response), [0])], - rabbit_access_control:check_user_pass_login(User, Pass). + %% The '%%"' at the end of the next line is for Emacs + case re:run(Response, "^\\0([^\\0]*)\\0([^\\0]*)$",%%" + [{capture, all_but_first, binary}]) of + {match, [User, Pass]} -> + rabbit_access_control:check_user_pass_login(User, Pass); + _ -> + {protocol_error, "response ~p invalid", [Response]} + end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d50599c1..2ad42c8f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -400,6 +400,13 @@ check_write_permitted(Resource, #ch{user = User}) -> check_read_permitted(Resource, #ch{user = User}) -> check_resource_access(User, Resource, read). +check_internal_exchange(#exchange{name = Name, internal = true}) -> + rabbit_misc:protocol_error(access_refused, + "cannot publish to internal ~s", + [rabbit_misc:rs(Name)]); +check_internal_exchange(_) -> + ok. + expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> rabbit_misc:protocol_error( not_found, "no previously declared queue", []); @@ -540,6 +547,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), + check_internal_exchange(Exchange), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), @@ -772,7 +780,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, passive = false, durable = Durable, auto_delete = AutoDelete, - internal = false, + internal = Internal, nowait = NoWait, arguments = Args}, _, State = #ch{virtual_host = VHostPath}) -> @@ -795,10 +803,11 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, CheckedType, Durable, AutoDelete, + Internal, Args) end, ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable, - AutoDelete, Args), + AutoDelete, Internal, Args), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 42861f86..dd009c83 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -49,7 +49,7 @@ boot() -> init([DefaultVHost]) -> #exchange{} = rabbit_exchange:declare( rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), - topic, true, false, []), + topic, true, false, false, []), {ok, #resource{virtual_host = DefaultVHost, kind = exchange, name = ?LOG_EXCH_NAME}}. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 7414c904..a95cf0b1 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -33,11 +33,11 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0, +-export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). %% this must be run inside a mnesia tx -export([maybe_auto_delete/1]). --export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]). +-export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]). %%---------------------------------------------------------------------------- @@ -49,13 +49,14 @@ -type(type() :: atom()). -spec(recover/0 :: () -> 'ok'). --spec(declare/5 :: - (name(), type(), boolean(), boolean(), rabbit_framing:amqp_table()) +-spec(declare/6 :: + (name(), type(), boolean(), boolean(), boolean(), + rabbit_framing:amqp_table()) -> rabbit_types:exchange()). -spec(check_type/1 :: (binary()) -> atom() | rabbit_types:connection_exit()). --spec(assert_equivalence/5 :: - (rabbit_types:exchange(), atom(), boolean(), boolean(), +-spec(assert_equivalence/6 :: + (rabbit_types:exchange(), atom(), boolean(), boolean(), boolean(), rabbit_framing:amqp_table()) -> 'ok' | rabbit_types:connection_exit()). -spec(assert_args_equivalence/2 :: @@ -90,7 +91,7 @@ %%---------------------------------------------------------------------------- --define(INFO_KEYS, [name, type, durable, auto_delete, arguments]). +-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]). recover() -> Xs = rabbit_misc:table_fold( @@ -113,11 +114,12 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> recover_with_bindings([], [], []) -> ok. -declare(XName, Type, Durable, AutoDelete, Args) -> +declare(XName, Type, Durable, AutoDelete, Internal, Args) -> X = #exchange{name = XName, type = Type, durable = Durable, auto_delete = AutoDelete, + internal = Internal, arguments = Args}, %% We want to upset things if it isn't ok; this is different from %% the other hooks invocations, where we tend to ignore the return @@ -170,14 +172,16 @@ check_type(TypeBin) -> assert_equivalence(X = #exchange{ durable = Durable, auto_delete = AutoDelete, + internal = Internal, type = Type}, - Type, Durable, AutoDelete, RequiredArgs) -> + Type, Durable, AutoDelete, Internal, RequiredArgs) -> (type_to_module(Type)):assert_args_equivalence(X, RequiredArgs); -assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete, - _Args) -> +assert_equivalence(#exchange{ name = Name }, + _Type, _Durable, _Internal, _AutoDelete, _Args) -> rabbit_misc:protocol_error( precondition_failed, - "cannot redeclare ~s with different type, durable or autodelete value", + "cannot redeclare ~s with different type, durable, " + "internal or autodelete value", [rabbit_misc:rs(Name)]). assert_args_equivalence(#exchange{ name = Name, arguments = Args }, @@ -215,6 +219,7 @@ i(name, #exchange{name = Name}) -> Name; i(type, #exchange{type = Type}) -> Type; i(durable, #exchange{durable = Durable}) -> Durable; i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete; +i(internal, #exchange{internal = Internal}) -> Internal; i(arguments, #exchange{arguments = Arguments}) -> Arguments; i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index c542ea66..e87ff879 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -714,24 +714,24 @@ ensure_stats_timer(State) -> handle_method0(MethodName, FieldsBin, State = #v1{connection = #connection{protocol = Protocol}}) -> - try - handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), - State) - catch exit:Reason -> - CompleteReason = case Reason of - #amqp_error{method = none} -> - Reason#amqp_error{method = MethodName}; - OtherReason -> OtherReason - end, + HandleException = + fun(R) -> case ?IS_RUNNING(State) of - true -> send_exception(State, 0, CompleteReason); + true -> send_exception(State, 0, R); %% We don't trust the client at this point - force %% them to wait for a bit so they can't DOS us with %% repeated failed logins etc. false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), - throw({channel0_error, State#v1.connection_state, - CompleteReason}) + throw({channel0_error, State#v1.connection_state, R}) end + end, + try + handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), + State) + catch exit:#amqp_error{method = none} = Reason -> + HandleException(Reason#amqp_error{method = MethodName}); + Type:Reason -> + HandleException({Type, Reason, MethodName, erlang:get_stacktrace()}) end. handle_method0(#'connection.start_ok'{mechanism = Mechanism, diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 5dd4c558..b3f8d7e2 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -27,6 +27,7 @@ -rabbit_upgrade({remove_user_scope, []}). -rabbit_upgrade({hash_passwords, []}). -rabbit_upgrade({add_ip_to_listener, []}). +-rabbit_upgrade({internal_exchanges, []}). -rabbit_upgrade({user_to_internal_user, []}). %% ------------------------------------------------------------------- @@ -36,6 +37,7 @@ -spec(remove_user_scope/0 :: () -> 'ok'). -spec(hash_passwords/0 :: () -> 'ok'). -spec(add_ip_to_listener/0 :: () -> 'ok'). +-spec(internal_exchanges/0 :: () -> 'ok'). -spec(user_to_internal_user/0 :: () -> 'ok'). -endif. @@ -73,6 +75,18 @@ add_ip_to_listener() -> end, [node, protocol, host, ip_address, port]). +internal_exchanges() -> + Tables = [rabbit_exchange, rabbit_durable_exchange], + AddInternalFun = + fun ({exchange, Name, Type, Durable, AutoDelete, Args}) -> + {exchange, Name, Type, Durable, AutoDelete, false, Args} + end, + [ ok = mnesia(T, + AddInternalFun, + [name, type, durable, auto_delete, internal, arguments]) + || T <- Tables ], + ok. + user_to_internal_user() -> mnesia( rabbit_user, |