diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 36 |
1 files changed, 18 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 24de9415..21541541 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -46,7 +46,7 @@ rate_timer_ref, expiry_timer_ref, stats_timer, - guid_to_channel, + msg_id_to_channel, ttl, ttl_timer_ref }). @@ -112,7 +112,7 @@ init(Q) -> expiry_timer_ref = undefined, ttl = undefined, stats_timer = rabbit_event:init_stats_timer(), - guid_to_channel = dict:new()}, hibernate, + msg_id_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -399,21 +399,21 @@ deliver_from_queue_deliver(AckRequired, false, State) -> fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. -confirm_messages(Guids, State = #q{guid_to_channel = GTC}) -> - {CMs, GTC1} = lists:foldl( - fun(Guid, {CMs, GTC0}) -> - case dict:find(Guid, GTC0) of +confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> + {CMs, MTC1} = lists:foldl( + fun(Guid, {CMs, MTC0}) -> + case dict:find(Guid, MTC0) of {ok, {ChPid, MsgSeqNo}} -> {gb_trees_cons(ChPid, MsgSeqNo, CMs), - dict:erase(Guid, GTC0)}; + dict:erase(Guid, MTC0)}; _ -> - {CMs, GTC0} + {CMs, MTC0} end - end, {gb_trees:empty(), GTC}, Guids), + end, {gb_trees:empty(), MTC}, MsgIds), gb_trees:map(fun(ChPid, MsgSeqNos) -> rabbit_channel:confirm(ChPid, MsgSeqNos) end, CMs), - State#q{guid_to_channel = GTC1}. + State#q{msg_id_to_channel = MTC1}. gb_trees_cons(Key, Value, Tree) -> case gb_trees:lookup(Key, Tree) of @@ -427,12 +427,12 @@ record_confirm_message(#delivery{sender = ChPid, msg_seq_no = MsgSeqNo, message = #basic_message { is_persistent = true, - guid = Guid}}, + id = MsgId}}, State = - #q{guid_to_channel = GTC, - q = #amqqueue{durable = true}}) -> + #q{msg_id_to_channel = MTC, + q = #amqqueue{durable = true}}) -> {confirm, - State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}}; + State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)}}; record_confirm_message(_Delivery, State) -> {no_confirm, State}. @@ -609,9 +609,9 @@ backing_queue_idle_timeout(State = #q{backing_queue = BQ}) -> fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State). maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - {Guids, BQS1} = Fun(BQS), + {MsgIds, BQS1} = Fun(BQS), run_message_queue( - confirm_messages(Guids, State#q{backing_queue_state = BQS1})). + confirm_messages(MsgIds, State#q{backing_queue_state = BQS1})). commit_transaction(Txn, From, C = #cr{acktags = ChAckTags}, State = #q{backing_queue = BQ, @@ -757,8 +757,8 @@ prioritise_cast(Msg, _State) -> maybe_expire -> 8; drop_expired -> 8; emit_stats -> 7; - {ack, _Txn, _MsgIds, _ChPid} -> 7; - {reject, _MsgIds, _Requeue, _ChPid} -> 7; + {ack, _Txn, _AckTags, _ChPid} -> 7; + {reject, _AckTags, _Requeue, _ChPid} -> 7; {notify_sent, _ChPid} -> 7; {unblock, _ChPid} -> 7; {maybe_run_queue_via_backing_queue, _Fun} -> 6; |