diff options
author | Emile Joubert <emile@rabbitmq.com> | 2011-05-24 13:55:42 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2011-05-24 13:55:42 +0100 |
commit | 546a643b30d3ac02221b1dd7ad7d2db66552c113 (patch) | |
tree | 740944083c320c2f295c21003e2038bb76d7aa1d | |
parent | fc6be65ada353aa98780a3b89139f18727c9492d (diff) | |
parent | f0c385c9adebcd83b32ee2d03f498a74e8ca16bc (diff) | |
download | rabbitmq-server-546a643b30d3ac02221b1dd7ad7d2db66552c113.tar.gz |
Merged bug24107 into default
-rw-r--r-- | docs/rabbitmqctl.1.xml | 66 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
-rw-r--r-- | src/rabbit.erl | 16 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 61 | ||||
-rw-r--r-- | src/rabbit_control.erl | 29 | ||||
-rw-r--r-- | src/rabbit_trace.erl | 69 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 21 | ||||
-rw-r--r-- | src/rabbit_vhost.erl | 36 |
8 files changed, 174 insertions, 125 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 62869158..ffa01894 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -624,14 +624,31 @@ </listitem> </varlistentry> - <varlistentry> - <term><cmdsynopsis><command>list_vhosts</command></cmdsynopsis></term> + <varlistentry role="usage-has-option-list"> + <term><cmdsynopsis><command>list_vhosts</command> <arg choice="opt" role="usage-option-list"><replaceable>vhostinfoitem</replaceable> ...</arg></cmdsynopsis></term> <listitem> <para> Lists virtual hosts. </para> + <para> + The <command>vhostinfoitem</command> parameter is used to indicate which + virtual host information items to include in the results. The column order in the + results will match the order of the parameters. + <command>vhostinfoitem</command> can take any value from + the list that follows: + </para> + <variablelist> + <varlistentry> + <term>name</term> + <listitem><para>The name of the virtual host with non-ASCII characters escaped as in C.</para></listitem> + </varlistentry> + <varlistentry> + <term>tracing</term> + <listitem><para>Whether tracing is enabled for this virtual host.</para></listitem> + </varlistentry> + </variablelist> <para role="example-prefix">For example:</para> - <screen role="example">rabbitmqctl list_vhosts</screen> + <screen role="example">rabbitmqctl list_vhosts name tracing</screen> <para role="example"> This command instructs the RabbitMQ broker to list all virtual hosts. @@ -1266,59 +1283,34 @@ </refsect2> <refsect2> - <title>Configuration variables</title> - <para> - Some configuration values can be changed at run time. Note - that this does not apply to all variables; many are only read - at startup - changing them will have no effect. - </para> + <title>Message Tracing</title> <variablelist> <varlistentry> - <term><cmdsynopsis><command>set_env</command> <arg choice="req"><replaceable>variable</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term> + <term><cmdsynopsis><command>trace_on</command> <arg choice="opt">-p <replaceable>vhost</replaceable></arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> - <term>variable</term> - <listitem><para>The name of the variable to set, as the string form of an Erlang term.</para></listitem> - </varlistentry> - <varlistentry> - <term>value</term> - <listitem><para>The value to set it to, as the string form of an Erlang term.</para></listitem> - </varlistentry> - </variablelist> - <para> - Set the value of a configuration variable. - </para> - </listitem> - </varlistentry> - - <varlistentry> - <term><cmdsynopsis><command>get_env</command> <arg choice="req"><replaceable>variable</replaceable></arg></cmdsynopsis></term> - <listitem> - <variablelist> - <varlistentry> - <term>variable</term> - <listitem><para>The name of the variable to get, as the string form of an Erlang term.</para></listitem> + <term>vhost</term> + <listitem><para>The name of the virtual host for which to start tracing.</para></listitem> </varlistentry> </variablelist> <para> - Get the value of a configuration variable, printing either - {ok,<command>Value</command>} or undefined. + Starts tracing. </para> </listitem> </varlistentry> <varlistentry> - <term><cmdsynopsis><command>unset_env</command> <arg choice="req"><replaceable>variable</replaceable></arg></cmdsynopsis></term> + <term><cmdsynopsis><command>trace_off</command> <arg choice="opt">-p <replaceable>vhost</replaceable></arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> - <term>variable</term> - <listitem><para>The name of the variable to clear, as the string form of an Erlang term.</para></listitem> + <term>vhost</term> + <listitem><para>The name of the virtual host for which to stop tracing.</para></listitem> </varlistentry> </variablelist> <para> - Clear the value of a configuration variable. + Stops tracing. </para> </listitem> </varlistentry> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 014c18b0..7dabb8c3 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -36,6 +36,7 @@ {auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, {auth_backends, [rabbit_auth_backend_internal]}, {delegate_count, 16}, + {trace_vhosts, []}, {tcp_listen_options, [binary, {packet, raw}, {reuseaddr, true}, diff --git a/src/rabbit.erl b/src/rabbit.erl index 9052f2f9..e6e80b4a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -25,8 +25,6 @@ -export([log_location/1]). --export([get_env/1, set_env/2, unset_env/1]). - %%--------------------------------------------------------------------------- %% Boot steps. -export([maybe_insert_default_data/0, boot_delegate/0, recover/0]). @@ -188,9 +186,6 @@ -spec(maybe_insert_default_data/0 :: () -> 'ok'). -spec(boot_delegate/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). --spec(get_env/1 :: (atom()) -> term()). --spec(set_env/2 :: (atom(), term()) -> 'ok'). --spec(unset_env/1 :: (atom()) -> 'ok'). -endif. @@ -519,14 +514,3 @@ log_rotation_result(ok, {error, SaslLogError}) -> {error, {cannot_rotate_sasl_logs, SaslLogError}}; log_rotation_result(ok, ok) -> ok. - -get_env(Key) -> - application:get_env(rabbit, Key). - -set_env(Key, Value) -> - application:set_env(rabbit, Key, Value), - rabbit_channel:refresh_config_all(). - -unset_env(Key) -> - application:unset_env(rabbit, Key), - rabbit_channel:refresh_config_all(). diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 3cf73e80..fa7e3a5a 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -32,6 +32,9 @@ ({ok, rabbit_router:routing_result(), [pid()]} | rabbit_types:error('not_found'))). +-type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())). +-type(body_input() :: (binary() | [binary()])). + -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). -spec(delivery/5 :: @@ -48,14 +51,14 @@ -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). -spec(publish/4 :: - (rabbit_exchange:name(), rabbit_router:routing_key(), - properties_input(), binary()) -> publish_result()). + (exchange_input(), rabbit_router:routing_key(), properties_input(), + body_input()) -> publish_result()). -spec(publish/7 :: - (rabbit_exchange:name(), rabbit_router:routing_key(), - boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), - properties_input(), binary()) -> publish_result()). --spec(build_content/2 :: (rabbit_framing:amqp_property_record(), binary()) -> - rabbit_types:content()). + (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(), + rabbit_types:maybe(rabbit_types:txn()), properties_input(), + body_input()) -> publish_result()). +-spec(build_content/2 :: (rabbit_framing:amqp_property_record(), + binary() | [binary()]) -> rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> {rabbit_framing:amqp_property_record(), binary()}). @@ -66,18 +69,18 @@ publish(Delivery = #delivery{ message = #basic_message{exchange_name = ExchangeName}}) -> case rabbit_exchange:lookup(ExchangeName) of - {ok, X} -> - {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery), - {ok, RoutingRes, DeliveredQPids}; - Other -> - Other + {ok, X} -> publish(X, Delivery); + Other -> Other end. delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, sender = self(), message = Message, msg_seq_no = MsgSeqNo}. -build_content(Properties, BodyBin) -> +build_content(Properties, BodyBin) when is_binary(BodyBin) -> + build_content(Properties, [BodyBin]); + +build_content(Properties, PFR) -> %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 {ClassId, _MethodId} = rabbit_framing_amqp_0_9_1:method_id('basic.publish'), @@ -85,7 +88,7 @@ build_content(Properties, BodyBin) -> properties = Properties, properties_bin = none, protocol = none, - payload_fragments_rev = [BodyBin]}. + payload_fragments_rev = PFR}. from_content(Content) -> #content{class_id = ClassId, @@ -126,9 +129,9 @@ message(ExchangeName, RoutingKey, {error, _Reason} = Error -> Error end. -message(ExchangeName, RoutingKey, RawProperties, BodyBin) -> +message(ExchangeName, RoutingKey, RawProperties, Body) -> Properties = properties(RawProperties), - Content = build_content(Properties, BodyBin), + Content = build_content(Properties, Body), {ok, Msg} = message(ExchangeName, RoutingKey, Content), Msg. @@ -153,18 +156,26 @@ indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1). %% Convenience function, for avoiding round-trips in calls across the %% erlang distributed network. -publish(ExchangeName, RoutingKeyBin, Properties, BodyBin) -> - publish(ExchangeName, RoutingKeyBin, false, false, none, Properties, - BodyBin). +publish(Exchange, RoutingKeyBin, Properties, Body) -> + publish(Exchange, RoutingKeyBin, false, false, none, Properties, + Body). %% Convenience function, for avoiding round-trips in calls across the %% erlang distributed network. -publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties, - BodyBin) -> - publish(delivery(Mandatory, Immediate, Txn, - message(ExchangeName, RoutingKeyBin, - properties(Properties), BodyBin), - undefined)). +publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Txn, + Props, Body) -> + publish(X, delivery(Mandatory, Immediate, Txn, + message(XName, RKey, properties(Props), Body), + undefined)); +publish(XName, RKey, Mandatory, Immediate, Txn, Props, Body) -> + case rabbit_exchange:lookup(XName) of + {ok, X} -> publish(X, RKey, Mandatory, Immediate, Txn, Props, Body); + Err -> Err + end. + +publish(X, Delivery) -> + {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery), + {ok, RoutingRes, DeliveredQPids}. is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 5a013711..8172f804 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -221,9 +221,10 @@ action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> Inform("Deleting vhost ~p", Args), call(Node, {rabbit_vhost, delete, Args}); -action(list_vhosts, Node, [], _Opts, Inform) -> +action(list_vhosts, Node, Args, _Opts, Inform) -> Inform("Listing vhosts", []), - display_list(call(Node, {rabbit_vhost, list, []})); + ArgAtoms = default_if_empty(Args, [name]), + display_info_list(call(Node, {rabbit_vhost, info_all, []}), ArgAtoms); action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) -> Inform("Listing permissions for user ~p", Args), @@ -282,18 +283,15 @@ action(list_consumers, Node, _Args, Opts, Inform) -> Other -> Other end; -action(set_env, Node, [Var, Term], _Opts, Inform) -> - Inform("Setting control variable ~s for node ~p to ~s", [Var, Node, Term]), - rpc_call(Node, rabbit, set_env, [parse(Var), parse(Term)]); - -action(get_env, Node, [Var], _Opts, Inform) -> - Inform("Getting control variable ~s for node ~p", [Var, Node]), - Val = rpc_call(Node, rabbit, get_env, [parse(Var)]), - io:format("~p~n", [Val]); +action(trace_on, Node, [], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Inform("Starting tracing for vhost ~p", [VHost]), + rpc_call(Node, rabbit_trace, start, [list_to_binary(VHost)]); -action(unset_env, Node, [Var], _Opts, Inform) -> - Inform("Clearing control variable ~s for node ~p", [Var, Node]), - rpc_call(Node, rabbit, unset_env, [parse(Var)]); +action(trace_off, Node, [], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Inform("Stopping tracing for vhost ~p", [VHost]), + rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]); action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), @@ -338,11 +336,6 @@ default_if_empty(List, Default) when is_list(List) -> true -> [list_to_atom(X) || X <- List] end. -parse(Str) -> - {ok, Tokens, _} = erl_scan:string(Str ++ "."), - {ok, Term} = erl_parse:parse_term(Tokens), - Term. - display_info_list(Results, InfoItemKeys) when is_list(Results) -> lists:foreach( fun (Result) -> display_row( diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index 09f2545d..7d36856a 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -16,60 +16,83 @@ -module(rabbit_trace). --export([init/1, tap_trace_in/2, tap_trace_out/2]). +-export([init/1, tracing/1, tap_trace_in/2, tap_trace_out/2, start/1, stop/1]). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). +-define(TRACE_VHOSTS, trace_vhosts). +-define(XNAME, <<"amq.rabbitmq.trace">>). + %%---------------------------------------------------------------------------- -ifdef(use_specs). --type(state() :: rabbit_exchange:name() | 'none'). +-type(state() :: rabbit_types:exchange() | 'none'). -spec(init/1 :: (rabbit_types:vhost()) -> state()). +-spec(tracing/1 :: (rabbit_types:vhost()) -> boolean()). -spec(tap_trace_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok'). -spec(tap_trace_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok'). +-spec(start/1 :: (rabbit_types:vhost()) -> 'ok'). +-spec(stop/1 :: (rabbit_types:vhost()) -> 'ok'). + -endif. %%---------------------------------------------------------------------------- init(VHost) -> - case application:get_env(rabbit, trace_exchanges) of - undefined -> none; - {ok, XNs} -> case proplists:get_value(VHost, XNs, none) of - none -> none; - Name -> rabbit_misc:r(VHost, exchange, Name) - end + case tracing(VHost) of + false -> none; + true -> {ok, X} = rabbit_exchange:lookup( + rabbit_misc:r(VHost, exchange, ?XNAME)), + X end. +tracing(VHost) -> + {ok, VHosts} = application:get_env(rabbit, ?TRACE_VHOSTS), + lists:member(VHost, VHosts). + tap_trace_in(Msg = #basic_message{exchange_name = #resource{name = XName}}, - TraceXN) -> - maybe_trace(TraceXN, Msg, <<"publish">>, XName, []). + TraceX) -> + maybe_trace(TraceX, Msg, <<"publish">>, XName, []). tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}, - TraceXN) -> + TraceX) -> RedeliveredNum = case Redelivered of true -> 1; false -> 0 end, - maybe_trace(TraceXN, Msg, <<"deliver">>, QName, + maybe_trace(TraceX, Msg, <<"deliver">>, QName, [{<<"redelivered">>, signedint, RedeliveredNum}]). +%%---------------------------------------------------------------------------- + +start(VHost) -> + update_config(fun (VHosts) -> [VHost | VHosts -- [VHost]] end). + +stop(VHost) -> + update_config(fun (VHosts) -> VHosts -- [VHost] end). + +update_config(Fun) -> + {ok, VHosts0} = application:get_env(rabbit, ?TRACE_VHOSTS), + VHosts = Fun(VHosts0), + application:set_env(rabbit, ?TRACE_VHOSTS, VHosts), + rabbit_channel:refresh_config_all(), + ok. + +%%---------------------------------------------------------------------------- + maybe_trace(none, _Msg, _RKPrefix, _RKSuffix, _Extra) -> ok; -maybe_trace(XName, #basic_message{exchange_name = #resource{name = XName}}, +maybe_trace(#exchange{name = Name}, #basic_message{exchange_name = Name}, _RKPrefix, _RKSuffix, _Extra) -> ok; -maybe_trace(XName, Msg = #basic_message{content = #content{ - payload_fragments_rev = PFR}}, +maybe_trace(X, Msg = #basic_message{content = #content{ + payload_fragments_rev = PFR}}, RKPrefix, RKSuffix, Extra) -> - case rabbit_basic:publish(XName, - <<RKPrefix/binary, ".", RKSuffix/binary>>, - #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, - list_to_binary(lists:reverse(PFR))) of - {ok, _, _} -> ok; - {error, not_found} -> rabbit_log:info("trace ~s not found~n", - [rabbit_misc:rs(XName)]) - end. + {ok, _, _} = rabbit_basic:publish( + X, <<RKPrefix/binary, ".", RKSuffix/binary>>, + #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR), + ok. msg_to_table(#basic_message{exchange_name = #resource{name = XName}, routing_keys = RoutingKeys, diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 31bbb929..bead388d 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -28,6 +28,7 @@ -rabbit_upgrade({topic_trie, mnesia, []}). -rabbit_upgrade({semi_durable_route, mnesia, []}). -rabbit_upgrade({exchange_event_serial, mnesia, []}). +-rabbit_upgrade({trace_exchanges, mnesia, []}). %% ------------------------------------------------------------------- @@ -41,6 +42,7 @@ -spec(topic_trie/0 :: () -> 'ok'). -spec(exchange_event_serial/0 :: () -> 'ok'). -spec(semi_durable_route/0 :: () -> 'ok'). +-spec(trace_exchanges/0 :: () -> 'ok'). -endif. @@ -113,6 +115,12 @@ exchange_event_serial() -> create(rabbit_exchange_serial, [{record_name, exchange_serial}, {attributes, [name, next]}]). +trace_exchanges() -> + [declare_exchange( + rabbit_misc:r(VHost, exchange, <<"amq.rabbitmq.trace">>), topic) || + VHost <- rabbit_vhost:list()], + ok. + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> @@ -129,3 +137,16 @@ transform(TableName, Fun, FieldList, NewRecordName) -> create(Tab, TabDef) -> {atomic, ok} = mnesia:create_table(Tab, TabDef), ok. + +%% Dumb replacement for rabbit_exchange:declare that does not require +%% the exchange type registry or worker pool to be running by dint of +%% not validating anything and assuming the exchange type does not +%% require serialisation. +declare_exchange(XName, Type) -> + X = #exchange{name = XName, + type = Type, + durable = true, + auto_delete = false, + internal = false, + arguments = []}, + ok = mnesia:dirty_write(rabbit_durable_exchange, X). diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 24c130ed..5270d80b 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -21,6 +21,7 @@ %%---------------------------------------------------------------------------- -export([add/1, delete/1, exists/1, list/0, with/2]). +-export([info/1, info/2, info_all/0, info_all/1]). -ifdef(use_specs). @@ -30,10 +31,18 @@ -spec(list/0 :: () -> [rabbit_types:vhost()]). -spec(with/2 :: (rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A). +-spec(info/1 :: (rabbit_types:vhost()) -> rabbit_types:infos()). +-spec(info/2 :: (rabbit_types:vhost(), rabbit_types:info_keys()) + -> rabbit_types:infos()). +-spec(info_all/0 :: () -> [rabbit_types:infos()]). +-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). + -endif. %%---------------------------------------------------------------------------- +-define(INFO_KEYS, [name, tracing]). + add(VHostPath) -> R = rabbit_misc:execute_mnesia_transaction( fun () -> @@ -51,12 +60,13 @@ add(VHostPath) -> rabbit_misc:r(VHostPath, exchange, Name), Type, true, false, false, []) || {Name,Type} <- - [{<<"">>, direct}, - {<<"amq.direct">>, direct}, - {<<"amq.topic">>, topic}, - {<<"amq.match">>, headers}, %% per 0-9-1 pdf - {<<"amq.headers">>, headers}, %% per 0-9-1 xml - {<<"amq.fanout">>, fanout}]], + [{<<"">>, direct}, + {<<"amq.direct">>, direct}, + {<<"amq.topic">>, topic}, + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml + {<<"amq.fanout">>, fanout}, + {<<"amq.rabbitmq.trace">>, topic}]], ok end), rabbit_log:info("Added vhost ~p~n", [VHostPath]), @@ -104,3 +114,17 @@ with(VHostPath, Thunk) -> Thunk() end end. + +%%---------------------------------------------------------------------------- + +infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. + +i(name, VHost) -> VHost; +i(tracing, VHost) -> rabbit_trace:tracing(VHost); +i(Item, _) -> throw({bad_argument, Item}). + +info(VHost) -> infos(?INFO_KEYS, VHost). +info(VHost, Items) -> infos(Items, VHost). + +info_all() -> info_all(?INFO_KEYS). +info_all(Items) -> [info(VHost, Items) || VHost <- list()]. |