summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-05-23 12:53:08 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-05-23 12:53:08 +0100
commit9009c8a625731314a29ab031c9a3a53e4d9e177c (patch)
tree624a308d18867aaaa4b229d2059b67451a005a3e
parente4f2fa497a7b209d67bf9cc7208f1f48a997cadf (diff)
downloadrabbitmq-server-9009c8a625731314a29ab031c9a3a53e4d9e177c.tar.gz
Distinguish between federated consumers and others.
-rw-r--r--src/rabbit_amqqueue.erl11
-rw-r--r--src/rabbit_amqqueue_process.erl19
-rw-r--r--src/rabbit_channel.erl17
3 files changed, 30 insertions, 17 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8c00c85c..f6935f25 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,7 +26,7 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, wake_up/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/4, basic_consume/9, basic_cancel/4]).
+-export([basic_get/4, basic_consume/10, basic_cancel/4]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
@@ -149,9 +149,9 @@
{'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(),
non_neg_integer(), boolean()) -> 'ok').
--spec(basic_consume/9 ::
+-spec(basic_consume/10 ::
(rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
- rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any())
+ rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any(), any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
@@ -549,9 +549,10 @@ basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) ->
+ ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg) ->
delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}).
+ ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs,
+ OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0fda5b8e..6b6bfa92 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -55,7 +55,7 @@
status
}).
--record(consumer, {tag, ack_required}).
+-record(consumer, {tag, ack_required, args}).
%% These are held in our process dictionary
-record(cr, {ch_pid,
@@ -527,12 +527,22 @@ run_message_queue(State = #q{q = Q}) ->
{IsEmpty1, State1} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
is_empty(State), State),
- case queue:len(State1#q.active_consumers) =/= 0 andalso IsEmpty1 of
+ case IsEmpty1 andalso active_unfederated(State1#q.active_consumers) of
true -> rabbit_federation_queue:go(Q);
false -> rabbit_federation_queue:stop(Q)
end,
State1.
+active_unfederated(Cs) ->
+ case queue:out(Cs) of
+ {empty, _} -> false;
+ {{value, {_Pid, #consumer{args = Args}}}, Cs1} ->
+ case rabbit_misc:table_lookup(Args, <<"x-purpose">>) of
+ {longstr, <<"federation">>} -> active_unfederated(Cs1);
+ _ -> true
+ end
+ end.
+
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -1118,7 +1128,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg},
+ ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg},
_From, State = #q{exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
@@ -1142,7 +1152,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
false -> ok
end,
Consumer = #consumer{tag = ConsumerTag,
- ack_required = not NoAck},
+ ack_required = not NoAck,
+ args = OtherArgs},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> Holder
end,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 69836204..a281f7cb 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -734,7 +734,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait,
- arguments = Arguments},
+ arguments = Args},
_, State = #ch{conn_pid = ConnPid,
limiter = Limiter,
consumer_mapping = ConsumerMapping}) ->
@@ -755,12 +755,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) ->
+ {CreditArgs, OtherArgs} = parse_credit_args(Args),
{rabbit_amqqueue:basic_consume(
Q, NoAck, self(),
rabbit_limiter:pid(Limiter),
rabbit_limiter:is_active(Limiter),
ActualConsumerTag, ExclusiveConsume,
- parse_credit_args(Arguments),
+ CreditArgs, OtherArgs,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
Q}
@@ -1217,12 +1218,12 @@ handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
parse_credit_args(Arguments) ->
case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of
- {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
- rabbit_misc:table_lookup(T, <<"drain">>)} of
- {{long, Credit}, {bool, Drain}} -> {Credit, Drain};
- _ -> none
- end;
- undefined -> none
+ {table, T} -> {case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, Credit}, {bool, Drain}} -> {Credit, Drain};
+ _ -> none
+ end, lists:keydelete(<<"x-credit">>, 1, Arguments)};
+ undefined -> {none, Arguments}
end.
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,