diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-06-03 12:59:03 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-06-03 12:59:03 +0100 |
commit | 5fb7289dfa678d37893ce956d223929729f1f633 (patch) | |
tree | 3916b39b6bc6b8f22f71cc36096c00f9daa36855 | |
parent | 88e20d03a2c80aeee37df6f50bc610bd2a0ff999 (diff) | |
parent | 7b79de595d6b6fb5db1478072b81c22e5723b999 (diff) | |
download | rabbitmq-server-5fb7289dfa678d37893ce956d223929729f1f633.tar.gz |
Merge in default
-rw-r--r-- | src/rabbit_amqqueue.erl | 11 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 53 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 17 |
3 files changed, 63 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8c00c85c..f6935f25 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,7 +26,7 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, wake_up/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/4, basic_consume/9, basic_cancel/4]). +-export([basic_get/4, basic_consume/10, basic_cancel/4]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). @@ -149,9 +149,9 @@ {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(), boolean()) -> 'ok'). --spec(basic_consume/9 :: +-spec(basic_consume/10 :: (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(), - rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any()) + rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any(), any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). @@ -549,9 +549,10 @@ basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) -> delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}). basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) -> + ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg) -> delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}). + ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, + OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d2f4a178..4c3751ba 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -55,7 +55,7 @@ status }). --record(consumer, {tag, ack_required}). +-record(consumer, {tag, ack_required, args}). %% These are held in our process dictionary -record(cr, {ch_pid, @@ -190,6 +190,10 @@ declare(Recover, From, State = #q{q = Q, recovery_barrier(Recover), State1 = process_args(State#q{backing_queue = BQ, backing_queue_state = BQS}), + case Q#amqqueue.name#resource.name of + <<"test">> -> rabbit_federation_queue:start_link(Q); + _ -> ok + end, rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #q.stats_timer, @@ -422,7 +426,7 @@ deliver_msgs_to_consumers(_DeliverFun, true, State) -> {true, State}; deliver_msgs_to_consumers(DeliverFun, false, State = #q{active_consumers = ActiveConsumers}) -> - case queue:out(ActiveConsumers) of + case pick_consumer(ActiveConsumers) of {empty, _} -> {false, State}; {{value, QEntry}, Tail} -> @@ -436,12 +440,14 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> C = lookup_ch(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), + notify_federation(State), {false, State}; false -> case rabbit_limiter:can_send(C#cr.limiter, Consumer#consumer.ack_required, Consumer#consumer.tag) of {suspend, Limiter} -> block_consumer(C#cr{limiter = Limiter}, E), + notify_federation(State), {false, State}; {continue, Limiter} -> AC1 = queue:in(E, State#q.active_consumers), @@ -523,8 +529,42 @@ run_message_queue(State) -> {_IsEmpty1, State1} = deliver_msgs_to_consumers( fun deliver_from_queue_deliver/2, is_empty(State), State), + notify_federation(State1), State1. +notify_federation(#q{q = Q, + active_consumers = ActiveConsumers, + backing_queue = BQ, + backing_queue_state = BQS}) -> + IsEmpty = BQ:is_empty(BQS), + case IsEmpty andalso active_unfederated(ActiveConsumers) of + true -> rabbit_federation_queue:run(Q); + false -> rabbit_federation_queue:stop(Q) + end. + +active_unfederated(Cs) -> + case queue:out(Cs) of + {empty, _} -> false; + {{value, C}, Cs1} -> case federated_consumer(C) of + true -> active_unfederated(Cs1); + false -> true + end + end. + +%% TODO this could be more efficient. But we'd need another representation, +%% and thus to abstract the representation of active_consumers. +pick_consumer(Cs) -> + case lists:splitwith(fun federated_consumer/1, queue:to_list(Cs)) of + {_, []} -> queue:out(Cs); + {Feds, [UnFed|Tl]} -> queue:out(queue:from_list([UnFed | Feds ++ Tl])) + end. + +federated_consumer({_Pid, #consumer{args = Args}}) -> + case rabbit_misc:table_lookup(Args, <<"x-purpose">>) of + {longstr, <<"federation">>} -> true; + _ -> false + end. + attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -1089,11 +1129,12 @@ handle_call({notify_down, ChPid}, _From, State) -> end; handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, - State = #q{q = #amqqueue{name = QName}}) -> + State = #q{q = Q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), case fetch(AckRequired, State1) of {empty, State2} -> + rabbit_federation_queue:basic_get(Q), reply(empty, State2); {{Message, IsDelivered, AckTag}, State2} -> State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = @@ -1110,7 +1151,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}, + ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg}, _From, State = #q{exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of in_use -> @@ -1134,7 +1175,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, false -> ok end, Consumer = #consumer{tag = ConsumerTag, - ack_required = not NoAck}, + ack_required = not NoAck, + args = OtherArgs}, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; true -> Holder end, @@ -1174,6 +1216,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, active_consumers = remove_consumer( ChPid, ConsumerTag, State#q.active_consumers)}, + notify_federation(State1), case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); true -> stop(ok, State1) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 1de14b5c..481a2173 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -734,7 +734,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_ack = NoAck, exclusive = ExclusiveConsume, nowait = NoWait, - arguments = Arguments}, + arguments = Args}, _, State = #ch{conn_pid = ConnPid, limiter = Limiter, consumer_mapping = ConsumerMapping}) -> @@ -755,12 +755,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin, case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> + {CreditArgs, OtherArgs} = parse_credit_args(Args), {rabbit_amqqueue:basic_consume( Q, NoAck, self(), rabbit_limiter:pid(Limiter), rabbit_limiter:is_active(Limiter), ActualConsumerTag, ExclusiveConsume, - parse_credit_args(Arguments), + CreditArgs, OtherArgs, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), Q} @@ -1218,12 +1219,12 @@ handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> parse_credit_args(Arguments) -> case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of - {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), - rabbit_misc:table_lookup(T, <<"drain">>)} of - {{long, Credit}, {boolean, Drain}} -> {Credit, Drain}; - _ -> none - end; - undefined -> none + {table, T} -> {case {rabbit_misc:table_lookup(T, <<"credit">>), + rabbit_misc:table_lookup(T, <<"drain">>)} of + {{long, Credit}, {bool, Drain}} -> {Credit, Drain}; + _ -> none + end, lists:keydelete(<<"x-credit">>, 1, Arguments)}; + undefined -> {none, Arguments} end. binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, |