diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-05-17 04:08:36 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-05-17 04:08:36 +0100 |
commit | 809585e9e7d1f5640e96622d618b1f7bb5b3d1dd (patch) | |
tree | da3e8e9d44135b6b02929bd0bfac3868c90d1f5d | |
parent | 4185fcfa5035d0c3301a4868c4f9c00766d2c04d (diff) | |
parent | 0d20278f234e6d356af5ff177318aec71f389217 (diff) | |
download | rabbitmq-server-809585e9e7d1f5640e96622d618b1f7bb5b3d1dd.tar.gz |
merge bug24004 into default
-rw-r--r-- | docs/rabbitmqctl.1.xml | 61 | ||||
-rw-r--r-- | include/rabbit.hrl | 1 | ||||
-rw-r--r-- | include/rabbit_exchange_type_spec.hrl | 12 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 15 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 32 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 73 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 8 | ||||
-rw-r--r-- | src/rabbit_control.erl | 18 | ||||
-rw-r--r-- | src/rabbit_direct.erl | 18 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 70 | ||||
-rw-r--r-- | src/rabbit_exchange_type.erl | 7 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange_type_headers.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 16 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 24 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 4 | ||||
-rw-r--r-- | src/rabbit_net.erl | 30 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 197 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 10 | ||||
-rw-r--r-- | src/rabbit_trace.erl | 101 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 6 | ||||
-rw-r--r-- | src/test_sup.erl | 4 | ||||
-rw-r--r-- | src/vm_memory_monitor.erl | 9 |
24 files changed, 501 insertions, 227 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 3550e5ea..62869158 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1264,6 +1264,67 @@ </varlistentry> </variablelist> </refsect2> + + <refsect2> + <title>Configuration variables</title> + <para> + Some configuration values can be changed at run time. Note + that this does not apply to all variables; many are only read + at startup - changing them will have no effect. + </para> + <variablelist> + <varlistentry> + <term><cmdsynopsis><command>set_env</command> <arg choice="req"><replaceable>variable</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>variable</term> + <listitem><para>The name of the variable to set, as the string form of an Erlang term.</para></listitem> + </varlistentry> + <varlistentry> + <term>value</term> + <listitem><para>The value to set it to, as the string form of an Erlang term.</para></listitem> + </varlistentry> + </variablelist> + <para> + Set the value of a configuration variable. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>get_env</command> <arg choice="req"><replaceable>variable</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>variable</term> + <listitem><para>The name of the variable to get, as the string form of an Erlang term.</para></listitem> + </varlistentry> + </variablelist> + <para> + Get the value of a configuration variable, printing either + {ok,<command>Value</command>} or undefined. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>unset_env</command> <arg choice="req"><replaceable>variable</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>variable</term> + <listitem><para>The name of the variable to clear, as the string form of an Erlang term.</para></listitem> + </varlistentry> + </variablelist> + <para> + Clear the value of a configuration variable. + </para> + </listitem> + </varlistentry> + + </variablelist> + </refsect2> </refsect1> </refentry> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 9f483c30..db4773b8 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -43,6 +43,7 @@ -record(resource, {virtual_host, kind, name}). -record(exchange, {name, type, durable, auto_delete, internal, arguments}). +-record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid}). diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index c80cc196..f6283ef7 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -16,16 +16,20 @@ -ifdef(use_specs). +-type(tx() :: 'transaction' | 'none'). +-type(serial() :: pos_integer() | tx()). + -spec(description/0 :: () -> [{atom(), any()}]). +-spec(serialise_events/0 :: () -> boolean()). -spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) -> rabbit_router:match_result()). -spec(validate/1 :: (rabbit_types:exchange()) -> 'ok'). --spec(create/2 :: (boolean(), rabbit_types:exchange()) -> 'ok'). --spec(delete/3 :: (boolean(), rabbit_types:exchange(), +-spec(create/2 :: (tx(), rabbit_types:exchange()) -> 'ok'). +-spec(delete/3 :: (tx(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). --spec(add_binding/3 :: (boolean(), rabbit_types:exchange(), +-spec(add_binding/3 :: (serial(), rabbit_types:exchange(), rabbit_types:binding()) -> 'ok'). --spec(remove_bindings/3 :: (boolean(), rabbit_types:exchange(), +-spec(remove_bindings/3 :: (serial(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). -spec(assert_args_equivalence/2 :: (rabbit_types:exchange(), rabbit_framing:amqp_table()) diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index b0b57af4..59c00848 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([user_pass_login/2, check_user_pass_login/2, check_user_login/2, +-export([check_user_pass_login/2, check_user_login/2, check_vhost_access/2, check_resource_access/3, list_vhosts/2]). %%---------------------------------------------------------------------------- @@ -30,9 +30,6 @@ -type(permission_atom() :: 'configure' | 'read' | 'write'). -type(vhost_permission_atom() :: 'read' | 'write'). --spec(user_pass_login/2 :: - (rabbit_types:username(), rabbit_types:password()) - -> rabbit_types:user() | rabbit_types:channel_exit()). -spec(check_user_pass_login/2 :: (rabbit_types:username(), rabbit_types:password()) -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}). @@ -49,16 +46,6 @@ %%---------------------------------------------------------------------------- -user_pass_login(User, Pass) -> - ?LOGDEBUG("Login with user ~p pass ~p~n", [User, Pass]), - case check_user_pass_login(User, Pass) of - {refused, Msg, Args} -> - rabbit_misc:protocol_error( - access_refused, "login refused: ~s", [io_lib:format(Msg, Args)]); - {ok, U} -> - U - end. - check_user_pass_login(Username, Password) -> check_user_login(Username, [{password, Password}]). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7bb90fd9..e58e67ad 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -139,8 +139,8 @@ -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit() | - fun ((boolean()) -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit())). + fun (() -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit())). -spec(run_backing_queue/3 :: (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). @@ -215,7 +215,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> case mnesia:read({rabbit_durable_queue, QueueName}) of [] -> ok = store_queue(Q), B = add_default_binding(Q), - fun (Tx) -> B(Tx), Q end; + fun () -> B(), Q end; %% Q exists on stopped node [_] -> rabbit_misc:const(not_found) end; @@ -223,7 +223,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> case rabbit_misc:is_process_alive(QPid) of true -> rabbit_misc:const(ExistingQ); false -> TailFun = internal_delete(QueueName), - fun (Tx) -> TailFun(Tx), ExistingQ end + fun () -> TailFun(), ExistingQ end end end end). @@ -434,9 +434,7 @@ internal_delete(QueueName) -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> rabbit_misc:const({error, not_found}); [_] -> Deletions = internal_delete1(QueueName), - fun (Tx) -> ok = rabbit_binding:process_deletions( - Deletions, Tx) - end + rabbit_binding:process_deletions(Deletions) end end). @@ -465,18 +463,14 @@ drop_expired(QPid) -> gen_server2:cast(QPid, drop_expired). on_node_down(Node) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) - end, - fun (Deletions, Tx) -> - rabbit_binding:process_deletions( - lists:foldl(fun rabbit_binding:combine_deletions/2, - rabbit_binding:new_deletions(), - Deletions), - Tx) + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) || + #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])), + rabbit_binding:process_deletions( + lists:foldl(fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), Dels)) end). delete_queue(QueueName) -> diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index dc119fbd..2f71bfab 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -21,7 +21,7 @@ -export([list_for_source/1, list_for_destination/1, list_for_source_and_destination/2]). -export([new_deletions/0, combine_deletions/2, add_deletion/3, - process_deletions/2]). + process_deletions/1]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_source/1, remove_for_source/1, @@ -77,7 +77,7 @@ (rabbit_types:binding_destination()) -> deletions()). -spec(remove_transient_for_destination/1 :: (rabbit_types:binding_destination()) -> deletions()). --spec(process_deletions/2 :: (deletions(), boolean()) -> 'ok'). +-spec(process_deletions/1 :: (deletions()) -> rabbit_misc:thunk('ok')). -spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()). -spec(add_deletion/3 :: (rabbit_exchange:name(), {'undefined' | rabbit_types:exchange(), @@ -114,12 +114,14 @@ recover(XNames, QNames) -> end) end, fun (R = #route{binding = B = #binding{source = Src}}, Tx) -> - case Tx of - true -> ok = sync_transient_route(R, fun mnesia:write/3); - false -> ok - end, {ok, X} = rabbit_exchange:lookup(Src), - rabbit_exchange:callback(X, add_binding, [Tx, X, B]) + Serial = case Tx of + true -> ok = sync_transient_route( + R, fun mnesia:write/3), + transaction; + false -> rabbit_exchange:serial(X) + end, + rabbit_exchange:callback(X, add_binding, [Serial, X, B]) end, rabbit_semi_durable_route), ok. @@ -142,7 +144,7 @@ add(Binding, InnerFun) -> case InnerFun(Src, Dst) of ok -> case mnesia:read({rabbit_route, B}) of [] -> add(Src, Dst, B); - [_] -> fun rabbit_misc:const_ok/1 + [_] -> fun rabbit_misc:const_ok/0 end; {error, _} = Err -> rabbit_misc:const(Err) end @@ -154,10 +156,13 @@ add(Src, Dst, B) -> mnesia:read({rabbit_durable_route, B}) =:= []) of true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable, fun mnesia:write/3), - fun (Tx) -> ok = rabbit_exchange:callback(Src, add_binding, - [Tx, Src, B]), - rabbit_event:notify_if(not Tx, binding_created, - info(B)) + ok = rabbit_exchange:callback( + Src, add_binding, [transaction, Src, B]), + Serial = rabbit_exchange:serial(Src), + fun () -> + ok = rabbit_exchange:callback( + Src, add_binding, [Serial, Src, B]), + ok = rabbit_event:notify(binding_created, info(B)) end; false -> rabbit_misc:const({error, binding_not_found}) end. @@ -181,7 +186,7 @@ remove(Src, Dst, B) -> ok = sync_route(#route{binding = B}, durable(Src), durable(Dst), fun mnesia:delete_object/3), Deletions = maybe_auto_delete(B#binding.source, [B], new_deletions()), - fun (Tx) -> ok = process_deletions(Deletions, Tx) end. + process_deletions(Deletions). list(VHostPath) -> VHostResource = rabbit_misc:r(VHostPath, '_'), @@ -407,19 +412,29 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> anything_but(not_deleted, Deleted1, Deleted2), [Bindings1 | Bindings2]}. -process_deletions(Deletions, Tx) -> - dict:fold( - fun (_XName, {X, Deleted, Bindings}, ok) -> - FlatBindings = lists:flatten(Bindings), - [rabbit_event:notify_if(not Tx, binding_deleted, info(B)) || - B <- FlatBindings], - case Deleted of - not_deleted -> - rabbit_exchange:callback(X, remove_bindings, - [Tx, X, FlatBindings]); - deleted -> - rabbit_event:notify_if(not Tx, exchange_deleted, - [{name, X#exchange.name}]), - rabbit_exchange:callback(X, delete, [Tx, X, FlatBindings]) - end - end, ok, Deletions). +process_deletions(Deletions) -> + AugmentedDeletions = + dict:map(fun (_XName, {X, deleted, Bindings}) -> + Bs = lists:flatten(Bindings), + x_callback(transaction, X, delete, Bs), + {X, deleted, Bs, none}; + (_XName, {X, not_deleted, Bindings}) -> + Bs = lists:flatten(Bindings), + x_callback(transaction, X, remove_bindings, Bs), + {X, not_deleted, Bs, rabbit_exchange:serial(X)} + end, Deletions), + fun() -> + dict:fold(fun (XName, {X, deleted, Bs, Serial}, ok) -> + ok = rabbit_event:notify( + exchange_deleted, [{name, XName}]), + del_notify(Bs), + x_callback(Serial, X, delete, Bs); + (_XName, {X, not_deleted, Bs, Serial}, ok) -> + del_notify(Bs), + x_callback(Serial, X, remove_bindings, Bs) + end, ok, AugmentedDeletions) + end. + +del_notify(Bs) -> [rabbit_event:notify(binding_deleted, info(B)) || B <- Bs]. + +x_callback(Arg, X, F, Bs) -> ok = rabbit_exchange:callback(X, F, [Arg, X, Bs]). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0c12614c..f0788862 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -281,6 +281,7 @@ handle_cast({deliver, ConsumerTag, AckRequired, true -> deliver; false -> deliver_no_ack end, State), + rabbit_trace:tap_trace_out(Msg), noreply(State1#ch{next_tag = DeliveryTag + 1}); handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> @@ -303,7 +304,10 @@ handle_info({'DOWN', MRef, process, QPid, Reason}, handle_publishing_queue_down(QPid, Reason, State); {ok, ConsumerTag} -> handle_consuming_queue_down(MRef, ConsumerTag, State) - end). + end); + +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}. handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), @@ -604,6 +608,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> + rabbit_trace:tap_trace_in(Message), {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -672,6 +677,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, true -> get_no_ack; false -> get end, State), + rabbit_trace:tap_trace_out(Msg), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 1af91f4c..6ab07111 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -282,6 +282,19 @@ action(list_consumers, Node, _Args, Opts, Inform) -> Other -> Other end; +action(set_env, Node, [Var, Term], _Opts, Inform) -> + Inform("Setting control variable ~s for node ~p to ~s", [Var, Node, Term]), + rpc_call(Node, application, set_env, [rabbit, parse(Var), parse(Term)]); + +action(get_env, Node, [Var], _Opts, Inform) -> + Inform("Getting control variable ~s for node ~p", [Var, Node]), + Val = rpc_call(Node, application, get_env, [rabbit, parse(Var)]), + io:format("~p~n", [Val]); + +action(unset_env, Node, [Var], _Opts, Inform) -> + Inform("Clearing control variable ~s for node ~p", [Var, Node]), + rpc_call(Node, application, unset_env, [rabbit, parse(Var)]); + action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), @@ -325,6 +338,11 @@ default_if_empty(List, Default) when is_list(List) -> true -> [list_to_atom(X) || X <- List] end. +parse(Str) -> + {ok, Tokens, _} = erl_scan:string(Str ++ "."), + {ok, Term} = erl_parse:parse_term(Tokens), + Term. + display_info_list(Results, InfoItemKeys) when is_list(Results) -> lists:foreach( fun (Result) -> display_row( diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 0dac18d1..7ff534ee 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -16,7 +16,7 @@ -module(rabbit_direct). --export([boot/0, connect/5, start_channel/8, disconnect/1]). +-export([boot/0, connect/4, start_channel/8, disconnect/1]). -include("rabbit.hrl"). @@ -25,8 +25,8 @@ -ifdef(use_specs). -spec(boot/0 :: () -> 'ok'). --spec(connect/5 :: (binary(), binary(), binary(), rabbit_types:protocol(), - rabbit_event:event_props()) -> +-spec(connect/4 :: (rabbit_types:username(), rabbit_types:vhost(), + rabbit_types:protocol(), rabbit_event:event_props()) -> {'ok', {rabbit_types:user(), rabbit_framing:amqp_table()}}). -spec(start_channel/8 :: @@ -53,11 +53,11 @@ boot() -> %%---------------------------------------------------------------------------- -connect(Username, Password, VHost, Protocol, Infos) -> +connect(Username, VHost, Protocol, Infos) -> case lists:keymember(rabbit, 1, application:which_applications()) of true -> - try rabbit_access_control:user_pass_login(Username, Password) of - #user{} = User -> + case rabbit_access_control:check_user_login(Username, []) of + {ok, User} -> try rabbit_access_control:check_vhost_access(User, VHost) of ok -> rabbit_event:notify(connection_created, Infos), {ok, {User, @@ -65,9 +65,9 @@ connect(Username, Password, VHost, Protocol, Infos) -> catch exit:#amqp_error{name = access_refused} -> {error, access_refused} - end - catch - exit:#amqp_error{name = access_refused} -> {error, auth_failure} + end; + {refused, _Msg, _Args} -> + {error, auth_failure} end; false -> {error, broker_not_found_on_node} diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 42111773..84a44cd2 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -23,8 +23,8 @@ lookup/1, lookup_or_die/1, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). -%% this must be run inside a mnesia tx --export([maybe_auto_delete/1]). +%% these must be run inside a mnesia tx +-export([maybe_auto_delete/1, serial/1]). %%---------------------------------------------------------------------------- @@ -75,6 +75,7 @@ -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). +-spec(serial/1:: (rabbit_types:exchange()) -> 'none' | pos_integer()). -endif. @@ -89,7 +90,7 @@ recover() -> end, fun (X, Tx) -> case Tx of - true -> ok = mnesia:write(rabbit_exchange, X, write); + true -> store(X); false -> ok end, rabbit_exchange:callback(X, create, [Tx, X]) @@ -107,13 +108,14 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> auto_delete = AutoDelete, internal = Internal, arguments = Args}, + XT = type_to_module(Type), %% We want to upset things if it isn't ok - ok = (type_to_module(Type)):validate(X), + ok = XT:validate(X), rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_exchange, XName}) of [] -> - ok = mnesia:write(rabbit_exchange, X, write), + store(X), ok = case Durable of true -> mnesia:write(rabbit_durable_exchange, X, write); @@ -125,7 +127,10 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> end end, fun ({new, Exchange}, Tx) -> - ok = (type_to_module(Type)):create(Tx, Exchange), + ok = XT:create(case Tx of + true -> transaction; + false -> none + end, Exchange), rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)), Exchange; ({existing, Exchange}, _Tx) -> @@ -134,6 +139,14 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> Err end). +store(X = #exchange{name = Name, type = Type}) -> + ok = mnesia:write(rabbit_exchange, X, write), + case (type_to_module(Type)):serialise_events() of + true -> S = #exchange_serial{name = Name, next = 1}, + ok = mnesia:write(rabbit_exchange_serial, S, write); + false -> ok + end. + %% Used with binaries sent over the wire; the type may not exist. check_type(TypeBin) -> case rabbit_registry:binary_to_type(TypeBin) of @@ -257,27 +270,30 @@ process_route(#resource{kind = queue} = QName, {WorkList, SeenXs, QNames}) -> {WorkList, SeenXs, [QName | QNames]}. -call_with_exchange(XName, Fun, PrePostCommitFun) -> - rabbit_misc:execute_mnesia_transaction( +call_with_exchange(XName, Fun) -> + rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case mnesia:read({rabbit_exchange, XName}) of - [] -> {error, not_found}; + [] -> rabbit_misc:const({error, not_found}); [X] -> Fun(X) end - end, PrePostCommitFun). + end). delete(XName, IfUnused) -> + Fun = case IfUnused of + true -> fun conditional_delete/1; + false -> fun unconditional_delete/1 + end, call_with_exchange( XName, - case IfUnused of - true -> fun conditional_delete/1; - false -> fun unconditional_delete/1 - end, - fun ({deleted, X, Bs, Deletions}, Tx) -> - ok = rabbit_binding:process_deletions( - rabbit_binding:add_deletion( - XName, {X, deleted, Bs}, Deletions), Tx); - (Error = {error, _InUseOrNotFound}, _Tx) -> - Error + fun (X) -> + case Fun(X) of + {deleted, X, Bs, Deletions} -> + rabbit_binding:process_deletions( + rabbit_binding:add_deletion( + XName, {X, deleted, Bs}, Deletions)); + {error, _InUseOrNotFound} = E -> + rabbit_misc:const(E) + end end). maybe_auto_delete(#exchange{auto_delete = false}) -> @@ -297,9 +313,23 @@ conditional_delete(X = #exchange{name = XName}) -> unconditional_delete(X = #exchange{name = XName}) -> ok = mnesia:delete({rabbit_durable_exchange, XName}), ok = mnesia:delete({rabbit_exchange, XName}), + ok = mnesia:delete({rabbit_exchange_serial, XName}), Bindings = rabbit_binding:remove_for_source(XName), {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. +serial(#exchange{name = XName, type = Type}) -> + case (type_to_module(Type)):serialise_events() of + true -> next_serial(XName); + false -> none + end. + +next_serial(XName) -> + [#exchange_serial{next = Serial}] = + mnesia:read(rabbit_exchange_serial, XName, write), + ok = mnesia:write(rabbit_exchange_serial, + #exchange_serial{name = XName, next = Serial + 1}, write), + Serial. + %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> {ok, Module} = rabbit_registry:lookup_module(exchange, T), diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index cd96407c..ab3d00dc 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -21,6 +21,13 @@ behaviour_info(callbacks) -> [ {description, 0}, + + %% Should Rabbit ensure that all binding events that are + %% delivered to an individual exchange can be serialised? (they + %% might still be delivered out of order, but there'll be a + %% serial number). + {serialise_events, 0}, + {route, 2}, %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 40078b1a..b485e31f 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -19,7 +19,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, serialise_events/0, route/2]). -export([validate/1, create/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -35,6 +35,8 @@ description() -> [{name, <<"direct">>}, {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. +serialise_events() -> false. + route(#exchange{name = Name}, #delivery{message = #basic_message{routing_keys = Routes}}) -> rabbit_router:match_routing_key(Name, Routes). diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index f32ef917..3c029722 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -19,7 +19,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, serialise_events/0, route/2]). -export([validate/1, create/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -35,6 +35,8 @@ description() -> [{name, <<"fanout">>}, {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. +serialise_events() -> false. + route(#exchange{name = Name}, _Delivery) -> rabbit_router:match_routing_key(Name, ['_']). diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 139feb04..f09e4aae 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, serialise_events/0, route/2]). -export([validate/1, create/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -41,6 +41,8 @@ description() -> [{name, <<"headers">>}, {description, <<"AMQP headers exchange, as per the AMQP specification">>}]. +serialise_events() -> false. + route(#exchange{name = Name}, #delivery{message = #basic_message{content = Content}}) -> Headers = case (Content#content.properties)#'P_basic'.headers of diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 74c566b8..348655b1 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, serialise_events/0, route/2]). -export([validate/1, create/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -38,6 +38,8 @@ description() -> [{name, <<"topic">>}, {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. +serialise_events() -> false. + %% NB: This may return duplicate results in some situations (that's ok) route(#exchange{name = X}, #delivery{message = #basic_message{routing_keys = Routes}}) -> @@ -49,19 +51,19 @@ route(#exchange{name = X}, validate(_X) -> ok. create(_Tx, _X) -> ok. -delete(true, #exchange{name = X}, _Bs) -> +delete(transaction, #exchange{name = X}, _Bs) -> trie_remove_all_edges(X), trie_remove_all_bindings(X), ok; -delete(false, _Exchange, _Bs) -> +delete(none, _Exchange, _Bs) -> ok. -add_binding(true, _Exchange, Binding) -> +add_binding(transaction, _Exchange, Binding) -> internal_add_binding(Binding); -add_binding(false, _Exchange, _Binding) -> +add_binding(none, _Exchange, _Binding) -> ok. -remove_bindings(true, #exchange{name = X}, Bs) -> +remove_bindings(transaction, #exchange{name = X}, Bs) -> %% The remove process is split into two distinct phases. In the %% first phase we gather the lists of bindings and edges to %% delete, then in the second phase we process all the @@ -80,7 +82,7 @@ remove_bindings(true, #exchange{name = X}, Bs) -> [trie_remove_edge(X, Parent, Node, W) || {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)], ok; -remove_bindings(false, _X, _Bs) -> +remove_bindings(none, _X, _Bs) -> ok. maybe_add_path(_X, [{root, none}], PathAcc) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 2d433ac2..53171e87 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -53,7 +53,7 @@ -export([all_module_attributes/1, build_acyclic_graph/3]). -export([now_ms/0]). -export([lock_file/1]). --export([const_ok/1, const/1]). +-export([const_ok/0, const/1]). -export([ntoa/1, ntoab/1]). -export([is_process_alive/1]). @@ -61,11 +61,10 @@ -ifdef(use_specs). --export_type([resource_name/0, thunk/1, const/1]). +-export_type([resource_name/0, thunk/1]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). --type(const(T) :: fun((any()) -> T)). -type(resource_name() :: binary()). -type(optdef() :: {flag, string()} | {option, string(), any()}). -type(channel_or_connection_exit() @@ -192,8 +191,8 @@ digraph:vertex(), digraph:vertex()})). -spec(now_ms/0 :: () -> non_neg_integer()). -spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')). --spec(const_ok/1 :: (any()) -> 'ok'). --spec(const/1 :: (A) -> const(A)). +-spec(const_ok/0 :: () -> 'ok'). +-spec(const/1 :: (A) -> thunk(A)). -spec(ntoa/1 :: (inet:ip_address()) -> string()). -spec(ntoab/1 :: (inet:ip_address()) -> string()). -spec(is_process_alive/1 :: (pid()) -> boolean()). @@ -406,17 +405,12 @@ execute_mnesia_transaction(TxFun, PrePostCommitFun) -> end), false). %% Like execute_mnesia_transaction/2, but TxFun is expected to return a -%% TailFun which gets called immediately before and after the tx commit +%% TailFun which gets called (only) immediately after the tx commit execute_mnesia_tx_with_tail(TxFun) -> case mnesia:is_transaction() of true -> execute_mnesia_transaction(TxFun); - false -> TailFun = execute_mnesia_transaction( - fun () -> - TailFun1 = TxFun(), - TailFun1(true), - TailFun1 - end), - TailFun(false) + false -> TailFun = execute_mnesia_transaction(TxFun), + TailFun() end. ensure_ok(ok, _) -> ok; @@ -879,8 +873,8 @@ lock_file(Path) -> ok = file:close(Lock) end. -const_ok(_) -> ok. -const(X) -> fun (_) -> X end. +const_ok() -> ok. +const(X) -> fun () -> X end. %% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see %% when IPv6 is enabled but not used (i.e. 99% of the time). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 77b06d0c..2df76d4e 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -225,6 +225,10 @@ table_definitions() -> [{record_name, exchange}, {attributes, record_info(fields, exchange)}, {match, #exchange{name = exchange_name_match(), _='_'}}]}, + {rabbit_exchange_serial, + [{record_name, exchange_serial}, + {attributes, record_info(fields, exchange_serial)}, + {match, #exchange_serial{name = exchange_name_match(), _='_'}}]}, {rabbit_durable_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index c500548a..b944ec81 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). -export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, - async_recv/3, port_command/2, send/2, close/1, + recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1, sockname/1, peername/1, peercert/1]). %%--------------------------------------------------------------------------- @@ -28,8 +28,8 @@ -export_type([socket/0]). -type(stat_option() :: - 'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' | - 'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend'). + 'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' | + 'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend'). -type(ok_val_or_error(A) :: rabbit_types:ok_or_error2(A, any())). -type(ok_or_any_error() :: rabbit_types:ok_or_error(any())). -type(socket() :: port() | #ssl_socket{}). @@ -42,9 +42,15 @@ -spec(getstat/2 :: (socket(), [stat_option()]) -> ok_val_or_error([{stat_option(), integer()}])). +-spec(recv/1 :: (socket()) -> + {'data', [char()] | binary()} | 'closed' | + rabbit_types:error(any()) | {'other', any()}). -spec(async_recv/3 :: (socket(), integer(), timeout()) -> rabbit_types:ok(any())). -spec(port_command/2 :: (socket(), iolist()) -> 'true'). +-spec(setopts/2 :: (socket(), [{atom(), any()} | + {raw, non_neg_integer(), non_neg_integer(), + binary()}]) -> ok_or_any_error()). -spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()). -spec(close/1 :: (socket()) -> ok_or_any_error()). -spec(sockname/1 :: @@ -80,6 +86,19 @@ getstat(Sock, Stats) when ?IS_SSL(Sock) -> getstat(Sock, Stats) when is_port(Sock) -> inet:getstat(Sock, Stats). +recv(Sock) when ?IS_SSL(Sock) -> + recv(Sock#ssl_socket.ssl, {ssl, ssl_closed, ssl_error}); +recv(Sock) when is_port(Sock) -> + recv(Sock, {tcp, tcp_closed, tcp_error}). + +recv(S, {DataTag, ClosedTag, ErrorTag}) -> + receive + {DataTag, S, Data} -> {data, Data}; + {ClosedTag, S} -> closed; + {ErrorTag, S, Reason} -> {error, Reason}; + Other -> {other, Other} + end. + async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) -> Pid = self(), Ref = make_ref(), @@ -103,6 +122,11 @@ port_command(Sock, Data) when ?IS_SSL(Sock) -> port_command(Sock, Data) when is_port(Sock) -> erlang:port_command(Sock, Data). +setopts(Sock, Options) when ?IS_SSL(Sock) -> + ssl:setopts(Sock#ssl_socket.ssl, Options); +setopts(Sock, Options) when is_port(Sock) -> + inet:setopts(Sock, Options). + send(Sock, Data) when ?IS_SSL(Sock) -> ssl:send(Sock#ssl_socket.ssl, Data); send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 42af91a8..f5214a77 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -38,10 +38,10 @@ %%-------------------------------------------------------------------------- --record(v1, {parent, sock, connection, callback, recv_length, recv_ref, +-record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, - channel_sup_sup_pid, start_heartbeat_fun, auth_mechanism, - auth_state}). + channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, + auth_mechanism, auth_state}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). @@ -192,7 +192,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), try - mainloop(Deb, switch_callback( + recvloop(Deb, switch_callback( #v1{parent = Parent, sock = ClientSock, connection = #connection{ @@ -204,8 +204,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, client_properties = none, capabilities = []}, callback = uninitialized_callback, - recv_length = 0, - recv_ref = none, + recv_len = 0, + pending_recv = false, connection_state = pre_init, queue_collector = Collector, heartbeater = none, @@ -213,6 +213,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, rabbit_event:init_stats_timer(), channel_sup_sup_pid = ChannelSupSupPid, start_heartbeat_fun = StartHeartbeatFun, + buf = [], + buf_len = 0, auth_mechanism = none, auth_state = none }, @@ -237,92 +239,104 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, end, done. -mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> - receive - {inet_async, Sock, Ref, {ok, Data}} -> - mainloop(Deb, handle_input(State#v1.callback, Data, - State#v1{recv_ref = none})); - {inet_async, Sock, Ref, {error, closed}} -> - if State#v1.connection_state =:= closed -> - State; - true -> - throw(connection_closed_abruptly) - end; - {inet_async, Sock, Ref, {error, Reason}} -> - throw({inet_error, Reason}); - {conserve_memory, Conserve} -> - mainloop(Deb, internal_conserve_memory(Conserve, State)); - {channel_closing, ChPid} -> - ok = rabbit_channel:ready_for_close(ChPid), - channel_cleanup(ChPid), - mainloop(Deb, State); - {'EXIT', Parent, Reason} -> - terminate(io_lib:format("broker forced connection closure " - "with reason '~w'", [Reason]), State), - %% this is what we are expected to do according to - %% http://www.erlang.org/doc/man/sys.html - %% - %% If we wanted to be *really* nice we should wait for a - %% while for clients to close the socket at their end, - %% just as we do in the ordinary error case. However, - %% since this termination is initiated by our parent it is - %% probably more important to exit quickly. - exit(Reason); - {channel_exit, _Channel, E = {writer, send_failed, _Error}} -> - throw(E); - {channel_exit, Channel, Reason} -> - mainloop(Deb, handle_exception(State, Channel, Reason)); - {'DOWN', _MRef, process, ChPid, Reason} -> - mainloop(Deb, handle_dependent_exit(ChPid, Reason, State)); - terminate_connection -> - State; - handshake_timeout -> - if ?IS_RUNNING(State) orelse - State#v1.connection_state =:= closing orelse - State#v1.connection_state =:= closed -> - mainloop(Deb, State); - true -> - throw({handshake_timeout, State#v1.callback}) - end; - timeout -> - case State#v1.connection_state of - closed -> mainloop(Deb, State); - S -> throw({timeout, S}) - end; - {'$gen_call', From, {shutdown, Explanation}} -> - {ForceTermination, NewState} = terminate(Explanation, State), - gen_server:reply(From, ok), - case ForceTermination of - force -> ok; - normal -> mainloop(Deb, NewState) - end; - {'$gen_call', From, info} -> - gen_server:reply(From, infos(?INFO_KEYS, State)), - mainloop(Deb, State); - {'$gen_call', From, {info, Items}} -> - gen_server:reply(From, try {ok, infos(Items, State)} - catch Error -> {error, Error} - end), - mainloop(Deb, State); - {'$gen_cast', emit_stats} -> - State1 = internal_emit_stats(State), - mainloop(Deb, State1); - {system, From, Request} -> - sys:handle_system_msg(Request, From, - Parent, ?MODULE, Deb, State); - Other -> - %% internal error -> something worth dying for - exit({unexpected_message, Other}) +recvloop(Deb, State = #v1{pending_recv = true}) -> + mainloop(Deb, State); +recvloop(Deb, State = #v1{connection_state = blocked}) -> + mainloop(Deb, State); +recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen}) + when BufLen < RecvLen -> + ok = rabbit_net:setopts(Sock, [{active, once}]), + mainloop(Deb, State#v1{pending_recv = true}); +recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) -> + {Data, Rest} = split_binary(case Buf of + [B] -> B; + _ -> list_to_binary(lists:reverse(Buf)) + end, RecvLen), + recvloop(Deb, handle_input(State#v1.callback, Data, + State#v1{buf = [Rest], + buf_len = BufLen - RecvLen})). + +mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> + case rabbit_net:recv(Sock) of + {data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf], + buf_len = BufLen + size(Data), + pending_recv = false}); + closed -> if State#v1.connection_state =:= closed -> + State; + true -> + throw(connection_closed_abruptly) + end; + {error, Reason} -> throw({inet_error, Reason}); + {other, Other} -> handle_other(Other, Deb, State) end. +handle_other({conserve_memory, Conserve}, Deb, State) -> + recvloop(Deb, internal_conserve_memory(Conserve, State)); +handle_other({channel_closing, ChPid}, Deb, State) -> + ok = rabbit_channel:ready_for_close(ChPid), + channel_cleanup(ChPid), + mainloop(Deb, State); +handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) -> + terminate(io_lib:format("broker forced connection closure " + "with reason '~w'", [Reason]), State), + %% this is what we are expected to do according to + %% http://www.erlang.org/doc/man/sys.html + %% + %% If we wanted to be *really* nice we should wait for a while for + %% clients to close the socket at their end, just as we do in the + %% ordinary error case. However, since this termination is + %% initiated by our parent it is probably more important to exit + %% quickly. + exit(Reason); +handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}}, + _Deb, _State) -> + throw(E); +handle_other({channel_exit, Channel, Reason}, Deb, State) -> + mainloop(Deb, handle_exception(State, Channel, Reason)); +handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) -> + mainloop(Deb, handle_dependent_exit(ChPid, Reason, State)); +handle_other(terminate_connection, _Deb, State) -> + State; +handle_other(handshake_timeout, Deb, State) + when ?IS_RUNNING(State) orelse + State#v1.connection_state =:= closing orelse + State#v1.connection_state =:= closed -> + mainloop(Deb, State); +handle_other(handshake_timeout, _Deb, State) -> + throw({handshake_timeout, State#v1.callback}); +handle_other(timeout, Deb, State = #v1{connection_state = closed}) -> + mainloop(Deb, State); +handle_other(timeout, _Deb, #v1{connection_state = S}) -> + throw({timeout, S}); +handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) -> + {ForceTermination, NewState} = terminate(Explanation, State), + gen_server:reply(From, ok), + case ForceTermination of + force -> ok; + normal -> mainloop(Deb, NewState) + end; +handle_other({'$gen_call', From, info}, Deb, State) -> + gen_server:reply(From, infos(?INFO_KEYS, State)), + mainloop(Deb, State); +handle_other({'$gen_call', From, {info, Items}}, Deb, State) -> + gen_server:reply(From, try {ok, infos(Items, State)} + catch Error -> {error, Error} + end), + mainloop(Deb, State); +handle_other({'$gen_cast', emit_stats}, Deb, State) -> + mainloop(Deb, internal_emit_stats(State)); +handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); +handle_other(Other, _Deb, _State) -> + %% internal error -> something worth dying for + exit({unexpected_message, Other}). + switch_callback(State = #v1{connection_state = blocked, heartbeater = Heartbeater}, Callback, Length) -> ok = rabbit_heartbeat:pause_monitor(Heartbeater), - State#v1{callback = Callback, recv_length = Length, recv_ref = none}; + State#v1{callback = Callback, recv_len = Length}; switch_callback(State, Callback, Length) -> - Ref = inet_op(fun () -> rabbit_net:async_recv( - State#v1.sock, Length, infinity) end), - State#v1{callback = Callback, recv_length = Length, recv_ref = Ref}. + State#v1{callback = Callback, recv_len = Length}. terminate(Explanation, State) when ?IS_RUNNING(State) -> {normal, send_exception(State, 0, @@ -336,12 +350,9 @@ internal_conserve_memory(true, State = #v1{connection_state = running}) -> internal_conserve_memory(false, State = #v1{connection_state = blocking}) -> State#v1{connection_state = running}; internal_conserve_memory(false, State = #v1{connection_state = blocked, - heartbeater = Heartbeater, - callback = Callback, - recv_length = Length, - recv_ref = none}) -> + heartbeater = Heartbeater}) -> ok = rabbit_heartbeat:resume_monitor(Heartbeater), - switch_callback(State#v1{connection_state = running}, Callback, Length); + State#v1{connection_state = running}; internal_conserve_memory(_Conserve, State) -> State. @@ -513,8 +524,8 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) -> case PayloadAndMarker of <<Payload:PayloadSize/binary, ?FRAME_END>> -> - handle_frame(Type, Channel, Payload, - switch_callback(State, frame_header, 7)); + switch_callback(handle_frame(Type, Channel, Payload, State), + frame_header, 7); _ -> throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker}) end; diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 45dd39a2..3726420d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -691,10 +691,10 @@ test_topic_matching() -> test_topic_expect_match(X, [{"a.b.c", []}, {"b.b.c", []}, {"", []}]), passed. -exchange_op_callback(X, Fun, ExtraArgs) -> +exchange_op_callback(X, Fun, Args) -> rabbit_misc:execute_mnesia_transaction( - fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end), - rabbit_exchange:callback(X, Fun, [false, X] ++ ExtraArgs). + fun () -> rabbit_exchange:callback(X, Fun, [transaction, X] ++ Args) end), + rabbit_exchange:callback(X, Fun, [none, X] ++ Args). test_topic_expect_match(X, List) -> lists:foreach( @@ -1553,7 +1553,7 @@ test_logs_working(MainLogFile, SaslLogFile) -> ok = rabbit_log:error("foo bar"), ok = error_logger:error_report(crash_report, [foo, bar]), %% give the error loggers some time to catch up - timer:sleep(50), + timer:sleep(100), [true, true] = non_empty_files([MainLogFile, SaslLogFile]), ok. @@ -2074,7 +2074,7 @@ test_queue_index() -> variable_queue_init(Q, Recover) -> rabbit_variable_queue:init( - Q, Recover, fun nop/1, fun nop/1, fun nop/2, fun nop/1). + Q, Recover, fun nop/2, fun nop/2, fun nop/2, fun nop/1). variable_queue_publish(IsPersistent, Count, VQ) -> lists:foldl( diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl new file mode 100644 index 00000000..2d15e7fc --- /dev/null +++ b/src/rabbit_trace.erl @@ -0,0 +1,101 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_trace). + +-export([tap_trace_in/1, tap_trace_out/1]). + +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(tap_trace_in/1 :: (rabbit_types:basic_message()) -> 'ok'). +-spec(tap_trace_out/1 :: (rabbit_amqqueue:qmsg()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +tap_trace_in(Msg) -> + maybe_trace(Msg, <<"publish">>, xname(Msg), []). + +tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}) -> + RedeliveredNum = case Redelivered of true -> 1; false -> 0 end, + maybe_trace(Msg, <<"deliver">>, QName, + [{<<"redelivered">>, signedint, RedeliveredNum}]). + +xname(#basic_message{exchange_name = #resource{name = XName}}) -> XName. +vhost(#basic_message{exchange_name = #resource{virtual_host = VHost}}) -> VHost. + +maybe_trace(Msg, RKPrefix, RKSuffix, Extra) -> + XName = xname(Msg), + case trace_exchange(vhost(Msg)) of + none -> ok; + XName -> ok; + TraceX -> case catch trace(TraceX, Msg, RKPrefix, RKSuffix, Extra) of + {'EXIT', R} -> rabbit_log:info("Trace died: ~p~n", [R]); + ok -> ok + end + end. + +trace_exchange(VHost) -> + case application:get_env(rabbit, trace_exchanges) of + undefined -> none; + {ok, Xs} -> proplists:get_value(VHost, Xs, none) + end. + +trace(TraceX, Msg0, RKPrefix, RKSuffix, Extra) -> + Msg = ensure_content_decoded(Msg0), + rabbit_basic:publish(rabbit_misc:r(vhost(Msg), exchange, TraceX), + <<RKPrefix/binary, ".", RKSuffix/binary>>, + #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, + payload(Msg)), + ok. + +msg_to_table(#basic_message{exchange_name = #resource{name = XName}, + routing_keys = RoutingKeys, + content = #content{properties = Props}}) -> + {PropsTable, _Ix} = + lists:foldl( + fun (K, {L, Ix}) -> + V = element(Ix, Props), + NewL = case V of + undefined -> L; + _ -> [{a2b(K), type(V), V} | L] + end, + {NewL, Ix + 1} + end, {[], 2}, record_info(fields, 'P_basic')), + [{<<"exchange_name">>, longstr, XName}, + {<<"routing_keys">>, array, [{longstr, K} || K <- RoutingKeys]}, + {<<"properties">>, table, PropsTable}, + {<<"node">>, longstr, a2b(node())}]. + +payload(#basic_message{content = #content{payload_fragments_rev = PFR}}) -> + list_to_binary(lists:reverse(PFR)). + +ensure_content_decoded(Msg = #basic_message{content = Content}) -> + Msg#basic_message{content = rabbit_binary_parser:ensure_content_decoded( + Content)}. + +a2b(A) -> + list_to_binary(atom_to_list(A)). + +type(V) when is_list(V) -> table; +type(V) when is_integer(V) -> signedint; +type(_V) -> longstr. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 842c3b4f..31bbb929 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -27,6 +27,7 @@ -rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}). -rabbit_upgrade({topic_trie, mnesia, []}). -rabbit_upgrade({semi_durable_route, mnesia, []}). +-rabbit_upgrade({exchange_event_serial, mnesia, []}). %% ------------------------------------------------------------------- @@ -38,6 +39,7 @@ -spec(internal_exchanges/0 :: () -> 'ok'). -spec(user_to_internal_user/0 :: () -> 'ok'). -spec(topic_trie/0 :: () -> 'ok'). +-spec(exchange_event_serial/0 :: () -> 'ok'). -spec(semi_durable_route/0 :: () -> 'ok'). -endif. @@ -107,6 +109,10 @@ semi_durable_route() -> create(rabbit_semi_durable_route, [{record_name, route}, {attributes, [binding, value]}]). +exchange_event_serial() -> + create(rabbit_exchange_serial, [{record_name, exchange_serial}, + {attributes, [name, next]}]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/test_sup.erl b/src/test_sup.erl index 150235da..84c4121c 100644 --- a/src/test_sup.erl +++ b/src/test_sup.erl @@ -33,10 +33,10 @@ test_supervisor_delayed_restart() -> test_supervisor_delayed_restart(SupPid) -> ok = ping_child(SupPid), ok = exit_child(SupPid), - timer:sleep(10), + timer:sleep(100), ok = ping_child(SupPid), ok = exit_child(SupPid), - timer:sleep(10), + timer:sleep(100), timeout = ping_child(SupPid), timer:sleep(1010), ok = ping_child(SupPid), diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index dcc6aff5..fb2fa267 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -239,10 +239,13 @@ get_total_memory({unix,darwin}) -> PageSize * (Inactive + Active + Free + Wired); get_total_memory({unix,freebsd}) -> - PageSize = freebsd_sysctl("vm.stats.vm.v_page_size"), - PageCount = freebsd_sysctl("vm.stats.vm.v_page_count"), + PageSize = sysctl("vm.stats.vm.v_page_size"), + PageCount = sysctl("vm.stats.vm.v_page_count"), PageCount * PageSize; +get_total_memory({unix,openbsd}) -> + sysctl("hw.usermem"); + get_total_memory({win32,_OSname}) -> %% Due to the Erlang print format bug, on Windows boxes the memory %% size is broken. For example Windows 7 64 bit with 4Gigs of RAM @@ -342,7 +345,7 @@ parse_line_aix(Line) -> false -> list_to_integer(Value) end}. -freebsd_sysctl(Def) -> +sysctl(Def) -> list_to_integer(cmd("/sbin/sysctl -n " ++ Def) -- "\n"). %% file:read_file does not work on files in /proc as it seems to get |