diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-04-11 13:21:38 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-04-11 13:21:38 +0100 |
commit | 6b4a7674c47bf6ec1df20eb428bcfe2916c26b7d (patch) | |
tree | e47cbcabb1068a1c4c97f297602d532302bf9ffd /src | |
parent | a35d2e51754efed7458c54213e5df92248c1b43f (diff) | |
download | rabbitmq-server-6b4a7674c47bf6ec1df20eb428bcfe2916c26b7d.tar.gz |
Add user to trace messages.
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_channel.erl | 15 | ||||
-rw-r--r-- | src/rabbit_trace.erl | 28 |
2 files changed, 26 insertions, 17 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index bfd779ee..b9782b2b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -264,7 +264,8 @@ handle_cast({deliver, ConsumerTag, AckRequired, routing_keys = [RoutingKey | _CcRoutes], content = Content}}}, State = #ch{writer_pid = WriterPid, - next_tag = DeliveryTag}) -> + next_tag = DeliveryTag, + user = User}) -> State1 = lock_message(AckRequired, ack_record(DeliveryTag, ConsumerTag, Msg), State), @@ -281,7 +282,7 @@ handle_cast({deliver, ConsumerTag, AckRequired, true -> deliver; false -> deliver_no_ack end, State), - rabbit_trace:tap_trace_out(Msg, ConsumerTag), + rabbit_trace:tap_trace_out(Msg, ConsumerTag, User), noreply(State1#ch{next_tag = DeliveryTag + 1}); handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> @@ -588,7 +589,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, immediate = Immediate}, Content, State = #ch{virtual_host = VHostPath, transaction_id = TxnKey, - confirm_enabled = ConfirmEnabled}) -> + confirm_enabled = ConfirmEnabled, + user = User}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -605,7 +607,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> - rabbit_trace:tap_trace_in(Message), + rabbit_trace:tap_trace_in(Message, User), {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -655,7 +657,8 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, _, State = #ch{writer_pid = WriterPid, conn_pid = ConnPid, - next_tag = DeliveryTag}) -> + next_tag = DeliveryTag, + user = User}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( @@ -674,7 +677,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, true -> get_no_ack; false -> get end, State), - rabbit_trace:tap_trace_out(Msg, none), + rabbit_trace:tap_trace_out(Msg, none, User), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index eee03165..6dac3cc9 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -16,7 +16,7 @@ -module(rabbit_trace). --export([tap_trace_in/1, tap_trace_out/2]). +-export([tap_trace_in/2, tap_trace_out/3]). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). @@ -25,9 +25,11 @@ -ifdef(use_specs). --spec(tap_trace_in/1 :: (rabbit_types:basic_message()) -> 'ok'). --spec(tap_trace_out/2 :: (rabbit_amqqueue:qmsg(), - rabbit_types:maybe(rabbit_types:ctag())) -> 'ok'). +-spec(tap_trace_in/2 :: (rabbit_types:basic_message(), rabbit_types:user()) + -> 'ok'). +-spec(tap_trace_out/3 :: (rabbit_amqqueue:qmsg(), + rabbit_types:maybe(rabbit_types:ctag()), + rabbit_types:user()) -> 'ok'). -endif. @@ -35,12 +37,13 @@ tap_trace_in(Message = #basic_message{ exchange_name = #resource{virtual_host = VHostBin, - name = XNameBin}}) -> + name = XNameBin}}, + User) -> check_trace( XNameBin, VHostBin, fun (TraceExchangeBin) -> - {EncodedMetadata, Payload} = message_to_table(Message), + {EncodedMetadata, Payload} = message_to_table(Message, User), publish(TraceExchangeBin, VHostBin, <<"publish">>, XNameBin, EncodedMetadata, Payload) end). @@ -49,13 +52,14 @@ tap_trace_out({#resource{name = QNameBin}, _QPid, _QMsgId, Redelivered, Message = #basic_message{ exchange_name = #resource{virtual_host = VHostBin, name = XNameBin}}}, - ConsumerTagOrNone) -> + ConsumerTagOrNone, + User) -> check_trace( XNameBin, VHostBin, fun (TraceExchangeBin) -> RedeliveredNum = case Redelivered of true -> 1; false -> 0 end, - {EncodedMetadata, Payload} = message_to_table(Message), + {EncodedMetadata, Payload} = message_to_table(Message, User), Fields0 = [{<<"redelivered">>, signedint, RedeliveredNum}] ++ EncodedMetadata, Fields = case ConsumerTagOrNone of @@ -84,8 +88,9 @@ publish(TraceExchangeBin, VHostBin, RKPrefix, RKSuffix, Table, Payload) -> ok. message_to_table(#basic_message{exchange_name = #resource{name = XName}, - routing_keys = RoutingKeys, - content = Content}) -> + routing_keys = RoutingKeys, + content = Content}, + #user{username = Username}) -> #content{properties = Props, payload_fragments_rev = PFR} = rabbit_binary_parser:ensure_content_decoded(Content), @@ -99,7 +104,8 @@ message_to_table(#basic_message{exchange_name = #resource{name = XName}, end, {NewL, Ix + 1} end, {[], 2}, record_info(fields, 'P_basic')), - {[{<<"exchange_name">>, longstr, XName}, + {[{<<"username">>, longstr, Username}, + {<<"exchange_name">>, longstr, XName}, {<<"routing_keys">>, array, [{longstr, K} || K <- RoutingKeys]}, {<<"properties">>, table, PropsTable}, {<<"node">>, longstr, a2b(node())}], |