summaryrefslogtreecommitdiff
path: root/src/rabbit_channel.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r--src/rabbit_channel.erl15
1 files changed, 9 insertions, 6 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index bfd779ee..b9782b2b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -264,7 +264,8 @@ handle_cast({deliver, ConsumerTag, AckRequired,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}}},
State = #ch{writer_pid = WriterPid,
- next_tag = DeliveryTag}) ->
+ next_tag = DeliveryTag,
+ user = User}) ->
State1 = lock_message(AckRequired,
ack_record(DeliveryTag, ConsumerTag, Msg),
State),
@@ -281,7 +282,7 @@ handle_cast({deliver, ConsumerTag, AckRequired,
true -> deliver;
false -> deliver_no_ack
end, State),
- rabbit_trace:tap_trace_out(Msg, ConsumerTag),
+ rabbit_trace:tap_trace_out(Msg, ConsumerTag, User),
noreply(State1#ch{next_tag = DeliveryTag + 1});
handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
@@ -588,7 +589,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
immediate = Immediate},
Content, State = #ch{virtual_host = VHostPath,
transaction_id = TxnKey,
- confirm_enabled = ConfirmEnabled}) ->
+ confirm_enabled = ConfirmEnabled,
+ user = User}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -605,7 +607,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
- rabbit_trace:tap_trace_in(Message),
+ rabbit_trace:tap_trace_in(Message, User),
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
@@ -655,7 +657,8 @@ handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
_, State = #ch{writer_pid = WriterPid,
conn_pid = ConnPid,
- next_tag = DeliveryTag}) ->
+ next_tag = DeliveryTag,
+ user = User}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
@@ -674,7 +677,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
true -> get_no_ack;
false -> get
end, State),
- rabbit_trace:tap_trace_out(Msg, none),
+ rabbit_trace:tap_trace_out(Msg, none, User),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,