summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl36
1 files changed, 18 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 24de9415..21541541 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -46,7 +46,7 @@
rate_timer_ref,
expiry_timer_ref,
stats_timer,
- guid_to_channel,
+ msg_id_to_channel,
ttl,
ttl_timer_ref
}).
@@ -112,7 +112,7 @@ init(Q) ->
expiry_timer_ref = undefined,
ttl = undefined,
stats_timer = rabbit_event:init_stats_timer(),
- guid_to_channel = dict:new()}, hibernate,
+ msg_id_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
@@ -399,21 +399,21 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
-confirm_messages(Guids, State = #q{guid_to_channel = GTC}) ->
- {CMs, GTC1} = lists:foldl(
- fun(Guid, {CMs, GTC0}) ->
- case dict:find(Guid, GTC0) of
+confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
+ {CMs, MTC1} = lists:foldl(
+ fun(Guid, {CMs, MTC0}) ->
+ case dict:find(Guid, MTC0) of
{ok, {ChPid, MsgSeqNo}} ->
{gb_trees_cons(ChPid, MsgSeqNo, CMs),
- dict:erase(Guid, GTC0)};
+ dict:erase(Guid, MTC0)};
_ ->
- {CMs, GTC0}
+ {CMs, MTC0}
end
- end, {gb_trees:empty(), GTC}, Guids),
+ end, {gb_trees:empty(), MTC}, MsgIds),
gb_trees:map(fun(ChPid, MsgSeqNos) ->
rabbit_channel:confirm(ChPid, MsgSeqNos)
end, CMs),
- State#q{guid_to_channel = GTC1}.
+ State#q{msg_id_to_channel = MTC1}.
gb_trees_cons(Key, Value, Tree) ->
case gb_trees:lookup(Key, Tree) of
@@ -427,12 +427,12 @@ record_confirm_message(#delivery{sender = ChPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
is_persistent = true,
- guid = Guid}},
+ id = MsgId}},
State =
- #q{guid_to_channel = GTC,
- q = #amqqueue{durable = true}}) ->
+ #q{msg_id_to_channel = MTC,
+ q = #amqqueue{durable = true}}) ->
{confirm,
- State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}};
+ State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)}};
record_confirm_message(_Delivery, State) ->
{no_confirm, State}.
@@ -609,9 +609,9 @@ backing_queue_idle_timeout(State = #q{backing_queue = BQ}) ->
fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State).
maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- {Guids, BQS1} = Fun(BQS),
+ {MsgIds, BQS1} = Fun(BQS),
run_message_queue(
- confirm_messages(Guids, State#q{backing_queue_state = BQS1})).
+ confirm_messages(MsgIds, State#q{backing_queue_state = BQS1})).
commit_transaction(Txn, From, C = #cr{acktags = ChAckTags},
State = #q{backing_queue = BQ,
@@ -757,8 +757,8 @@ prioritise_cast(Msg, _State) ->
maybe_expire -> 8;
drop_expired -> 8;
emit_stats -> 7;
- {ack, _Txn, _MsgIds, _ChPid} -> 7;
- {reject, _MsgIds, _Requeue, _ChPid} -> 7;
+ {ack, _Txn, _AckTags, _ChPid} -> 7;
+ {reject, _AckTags, _Requeue, _ChPid} -> 7;
{notify_sent, _ChPid} -> 7;
{unblock, _ChPid} -> 7;
{maybe_run_queue_via_backing_queue, _Fun} -> 6;