diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-05-24 14:12:09 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-05-24 14:12:09 +0100 |
commit | 7b79de595d6b6fb5db1478072b81c22e5723b999 (patch) | |
tree | 8624246108bc60bfb364002ec05313839921dd84 | |
parent | 89b3be6ef4138452869b82324c582a1aff72c6c8 (diff) | |
download | rabbitmq-server-7b79de595d6b6fb5db1478072b81c22e5723b999.tar.gz |
If we have local active consumers, ignore federated ones.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 27 |
1 files changed, 20 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 71dbb9de..4c3751ba 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -426,7 +426,7 @@ deliver_msgs_to_consumers(_DeliverFun, true, State) -> {true, State}; deliver_msgs_to_consumers(DeliverFun, false, State = #q{active_consumers = ActiveConsumers}) -> - case queue:out(ActiveConsumers) of + case pick_consumer(ActiveConsumers) of {empty, _} -> {false, State}; {{value, QEntry}, Tail} -> @@ -544,12 +544,25 @@ notify_federation(#q{q = Q, active_unfederated(Cs) -> case queue:out(Cs) of - {empty, _} -> false; - {{value, {_Pid, #consumer{args = Args}}}, Cs1} -> - case rabbit_misc:table_lookup(Args, <<"x-purpose">>) of - {longstr, <<"federation">>} -> active_unfederated(Cs1); - _ -> true - end + {empty, _} -> false; + {{value, C}, Cs1} -> case federated_consumer(C) of + true -> active_unfederated(Cs1); + false -> true + end + end. + +%% TODO this could be more efficient. But we'd need another representation, +%% and thus to abstract the representation of active_consumers. +pick_consumer(Cs) -> + case lists:splitwith(fun federated_consumer/1, queue:to_list(Cs)) of + {_, []} -> queue:out(Cs); + {Feds, [UnFed|Tl]} -> queue:out(queue:from_list([UnFed | Feds ++ Tl])) + end. + +federated_consumer({_Pid, #consumer{args = Args}}) -> + case rabbit_misc:table_lookup(Args, <<"x-purpose">>) of + {longstr, <<"federation">>} -> true; + _ -> false end. attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, |