diff options
-rw-r--r-- | src/rabbit_channel.erl | 20 |
1 files changed, 10 insertions, 10 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index cee8d761..d2f55277 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -191,7 +191,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, most_recently_declared_queue = <<>>, queue_monitors = dict:new(), consumer_mapping = dict:new(), - blocking = gb_sets:new(), + blocking = sets:new(), queue_consumers = dict:new(), queue_collector_pid = CollectorPid, stats_timer = StatsTimer, @@ -515,14 +515,14 @@ check_name(_Kind, NameBin) -> NameBin. queue_blocked(QPid, State = #ch{blocking = Blocking}) -> - case gb_sets:is_element(QPid, Blocking) of + case sets:is_element(QPid, Blocking) of false -> State; - true -> Blocking1 = gb_sets:delete(QPid, Blocking), - ok = case gb_sets:is_empty(Blocking1) of - true -> rabbit_writer:send_command( - State#ch.writer_pid, - #'channel.flow_ok'{active = false}); - false -> ok + true -> Blocking1 = sets:del_element(QPid, Blocking), + ok = case sets:size(Blocking1) of + 0 -> rabbit_writer:send_command( + State#ch.writer_pid, + #'channel.flow_ok'{active = false}); + _ -> ok end, demonitor_queue(QPid, State#ch{blocking = Blocking1}) end. @@ -1114,7 +1114,7 @@ handle_method(#'channel.flow'{active = false}, _, [] -> {reply, #'channel.flow_ok'{active = false}, State1}; QPids -> State2 = lists:foldl(fun monitor_queue/2, State1#ch{blocking = - gb_sets:from_list(QPids)}, + sets:from_list(QPids)}, QPids), ok = rabbit_amqqueue:flush_all(QPids, self()), {noreply, State2} @@ -1167,7 +1167,7 @@ queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer, unconfirmed_qm = UQM}) -> StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine, ConsumerMonitored = dict:is_key(QPid, QCons), - QueueBlocked = gb_sets:is_element(QPid, Blocking), + QueueBlocked = sets:is_element(QPid, Blocking), ConfirmMonitored = gb_trees:is_defined(QPid, UQM), StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored. |