diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-07-05 11:11:46 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-07-05 11:11:46 +0100 |
commit | e0af641ee62b45ba88353a77a7026f365a9aa234 (patch) | |
tree | 6bb88b9a2347f6586a36687ee139c2b21758ee68 | |
parent | 15d7f14ef94da323b073b8bf0fe926e272c5f3e4 (diff) | |
parent | 863982ef29a588777c88359fa7ab1a76bc324c4a (diff) | |
download | rabbitmq-server-e0af641ee62b45ba88353a77a7026f365a9aa234.tar.gz |
Merge in default
-rw-r--r-- | src/priority_queue.erl | 35 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 16 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 92 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 17 |
4 files changed, 116 insertions, 44 deletions
diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 18e1e8d9..8ded389b 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -40,8 +40,8 @@ -module(priority_queue). --export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, - out/1, join/2]). +-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, from_list/1, + in/2, in/3, out/1, join/2, filter/2, fold/3, highest/1]). %%---------------------------------------------------------------------------- @@ -59,10 +59,14 @@ -spec(is_empty/1 :: (pqueue()) -> boolean()). -spec(len/1 :: (pqueue()) -> non_neg_integer()). -spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]). +-spec(from_list/1 :: ([{priority(), any()}]) -> pqueue()). -spec(in/2 :: (any(), pqueue()) -> pqueue()). -spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). -spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}). -spec(join/2 :: (pqueue(), pqueue()) -> pqueue()). +-spec(filter/2 :: (fun ((any()) -> boolean()), pqueue()) -> pqueue()). +-spec(fold/3 :: (fun ((any(), any()) -> any()), any(), pqueue()) -> any()). +-spec(highest/1 :: (pqueue()) -> priority()). -endif. @@ -96,6 +100,9 @@ to_list({pqueue, Queues}) -> [{maybe_negate_priority(P), V} || {P, Q} <- Queues, {0, V} <- to_list(Q)]. +from_list(L) -> + lists:foldl(fun ({P, E}, Q) -> in(E, P, Q) end, new(), L). + in(Item, Q) -> in(Item, 0, Q). @@ -147,6 +154,14 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. +out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0); +out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)). + +add_p(R, P) -> case R of + {empty, Q} -> {empty, Q}; + {{value, V}, Q} -> {{value, V, P}, Q} + end. + join(A, {queue, [], [], 0}) -> A; join({queue, [], [], 0}, B) -> @@ -185,6 +200,22 @@ merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity -> merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) -> merge(As, Bs, [ {PB, B} | Acc ]). +filter(Pred, Q) -> fold(fun(V, P, Acc) -> + case Pred(V) of + true -> in(V, P, Acc); + false -> Acc + end + end, new(), Q). + +fold(Fun, Init, Q) -> case out_p(Q) of + {empty, _Q} -> Init; + {{value, V, P}, Q1} -> fold(Fun, Fun(V, P, Init), Q1) + end. + +highest({queue, [], [], 0}) -> exit(highest_priority_of_empty_queue); +highest({queue, _, _, _}) -> 0; +highest({pqueue, [{P, _} | _]}) -> maybe_negate_priority(P). + r2f([], 0) -> {queue, [], [], 0}; r2f([_] = R, 1) -> {queue, [], R, 1}; r2f([X,Y], 2) -> {queue, [X], [Y], 2}; diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 767abeb0..7004a353 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,7 +26,7 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, wake_up/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/4, basic_consume/9, basic_cancel/4]). +-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_federation/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). @@ -149,12 +149,13 @@ {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(), boolean()) -> 'ok'). --spec(basic_consume/9 :: +-spec(basic_consume/10 :: (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(), - rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any()) + rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any(), any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). +-spec(notify_federation/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(resume/2 :: (pid(), pid()) -> 'ok'). @@ -296,6 +297,7 @@ store_queue(Q = #amqqueue{durable = false}) -> policy_changed(Q1, Q2) -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2), + rabbit_federation_queue:policy_changed(Q1, Q2), %% Make sure we emit a stats event even if nothing %% mirroring-related has changed - the policy may have changed anyway. wake_up(Q1). @@ -554,13 +556,17 @@ basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) -> delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}). basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) -> + ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg) -> delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}). + ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, + OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). +notify_federation(#amqqueue{pid = QPid}) -> + delegate:cast(QPid, notify_federation). + notify_sent(QPid, ChPid) -> Key = {consumer_credit_to, QPid}, put(Key, case get(Key) of diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5409a806..c4face1f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -55,7 +55,7 @@ status }). --record(consumer, {tag, ack_required}). +-record(consumer, {tag, ack_required, args}). %% These are held in our process dictionary -record(cr, {ch_pid, @@ -145,7 +145,7 @@ init_state(Q) -> State = #q{q = Q, exclusive_consumer = none, has_had_consumers = false, - active_consumers = queue:new(), + active_consumers = priority_queue:new(), senders = pmon:new(), msg_id_to_channel = gb_trees:empty(), status = running}, @@ -198,6 +198,7 @@ declare(Recover, From, State = #q{q = Q, recovery_barrier(Recover), State1 = process_args(State#q{backing_queue = BQ, backing_queue_state = BQS}), + rabbit_federation_queue:maybe_start(Q), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #q.stats_timer, @@ -264,7 +265,7 @@ init_dlx_routing_key(RoutingKey, State) -> init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}. -terminate_shutdown(Fun, State) -> +terminate_shutdown(Fun, State = #q{q = Q}) -> State1 = #q{backing_queue_state = BQS} = lists:foldl(fun (F, S) -> F(S) end, State, [fun stop_sync_timer/1, @@ -275,6 +276,7 @@ terminate_shutdown(Fun, State) -> undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), QName = qname(State), + rabbit_federation_queue:maybe_stop(Q), [emit_consumer_deleted(Ch, CTag, QName) || {Ch, CTag, _} <- consumers(State1)], State1#q{backing_queue_state = Fun(BQS)} @@ -358,7 +360,7 @@ ensure_stats_timer(State) -> rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats). assert_invariant(State = #q{active_consumers = AC}) -> - true = (queue:is_empty(AC) orelse is_empty(State)). + true = (priority_queue:is_empty(AC) orelse is_empty(State)). is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS). @@ -377,7 +379,7 @@ ch_record(ChPid, LimiterPid) -> monitor_ref = MonitorRef, acktags = queue:new(), consumer_count = 0, - blocked_consumers = queue:new(), + blocked_consumers = priority_queue:new(), limiter = Limiter, unsent_message_count = 0}, put(Key, C), @@ -406,7 +408,8 @@ erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) -> all_ch_record() -> [C || {{ch, _}, C} <- get()]. block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> - update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}). + Blocked1 = priority_queue:in(QEntry, consumer_priority(QEntry), Blocked), + update_ch_record(C#cr{blocked_consumers = Blocked1}). is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). @@ -430,7 +433,7 @@ deliver_msgs_to_consumers(_DeliverFun, true, State) -> {true, State}; deliver_msgs_to_consumers(DeliverFun, false, State = #q{active_consumers = ActiveConsumers}) -> - case queue:out(ActiveConsumers) of + case priority_queue:out(ActiveConsumers) of {empty, _} -> {false, State}; {{value, QEntry}, Tail} -> @@ -444,15 +447,18 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> C = lookup_ch(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), + notify_federation(State), {false, State}; false -> case rabbit_limiter:can_send(C#cr.limiter, Consumer#consumer.ack_required, Consumer#consumer.tag) of {suspend, Limiter} -> block_consumer(C#cr{limiter = Limiter}, E), + notify_federation(State), {false, State}; {continue, Limiter} -> - AC1 = queue:in(E, State#q.active_consumers), + AC1 = priority_queue:in(E, consumer_priority(E), + State#q.active_consumers), deliver_msg_to_consumer( DeliverFun, Consumer, C#cr{limiter = Limiter}, State#q{active_consumers = AC1}) @@ -531,8 +537,28 @@ run_message_queue(State) -> {_IsEmpty1, State1} = deliver_msgs_to_consumers( fun deliver_from_queue_deliver/2, is_empty(State), State), + notify_federation(State1), State1. +notify_federation(#q{q = Q, + active_consumers = ActiveConsumers, + backing_queue = BQ, + backing_queue_state = BQS}) -> + IsEmpty = BQ:is_empty(BQS), + case IsEmpty andalso active_unfederated(ActiveConsumers) of + true -> rabbit_federation_queue:run(Q); + false -> rabbit_federation_queue:pause(Q) + end. + +active_unfederated(Cs) -> + not priority_queue:is_empty(Cs) andalso priority_queue:highest(Cs) >= 0. + +consumer_priority({_ChPid, #consumer{args = Args}}) -> + case rabbit_misc:table_lookup(Args, <<"x-priority">>) of + {_, Priority} -> Priority; + _ -> 0 + end. + attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -629,17 +655,17 @@ requeue(AckTags, ChPid, State) -> fun (State1) -> requeue_and_run(AckTags, State1) end). remove_consumer(ChPid, ConsumerTag, Queue) -> - queue:filter(fun ({CP, #consumer{tag = CTag}}) -> - (CP /= ChPid) or (CTag /= ConsumerTag) - end, Queue). + priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) -> + (CP /= ChPid) or (CTag /= ConsumerTag) + end, Queue). remove_consumers(ChPid, Queue, QName) -> - queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid -> - emit_consumer_deleted(ChPid, CTag, QName), - false; - (_) -> - true - end, Queue). + priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid -> + emit_consumer_deleted(ChPid, CTag, QName), + false; + (_) -> + true + end, Queue). possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of @@ -654,17 +680,17 @@ possibly_unblock(State, ChPid, Update) -> unblock(State, C = #cr{limiter = Limiter}) -> case lists:partition( - fun({_ChPid, #consumer{tag = CTag}}) -> + fun({_P, {_ChPid, #consumer{tag = CTag}}}) -> rabbit_limiter:is_consumer_blocked(Limiter, CTag) - end, queue:to_list(C#cr.blocked_consumers)) of + end, priority_queue:to_list(C#cr.blocked_consumers)) of {_, []} -> update_ch_record(C), State; {Blocked, Unblocked} -> - BlockedQ = queue:from_list(Blocked), - UnblockedQ = queue:from_list(Unblocked), + BlockedQ = priority_queue:from_list(Blocked), + UnblockedQ = priority_queue:from_list(Unblocked), update_ch_record(C#cr{blocked_consumers = BlockedQ}), - AC1 = queue:join(State#q.active_consumers, UnblockedQ), + AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ), run_message_queue(State#q{active_consumers = AC1}) end. @@ -1002,9 +1028,9 @@ consumers(#q{active_consumers = ActiveConsumers}) -> consumers(ActiveConsumers, []), all_ch_record()). consumers(Consumers, Acc) -> - rabbit_misc:queue_fold( - fun ({ChPid, #consumer{tag = CTag, ack_required = AckRequired}}, Acc1) -> - [{ChPid, CTag, AckRequired} | Acc1] + priority_queue:fold( + fun ({ChPid, #consumer{tag = CTag, ack_required = AckReq}}, _P, Acc1) -> + [{ChPid, CTag, AckReq} | Acc1] end, Acc, Consumers). emit_stats(State) -> @@ -1113,7 +1139,7 @@ handle_call({notify_down, ChPid}, _From, State) -> end; handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, - State = #q{q = #amqqueue{name = QName}}) -> + State = #q{q = Q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), case fetch(AckRequired, State1) of @@ -1134,7 +1160,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}, + ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg}, _From, State = #q{exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of in_use -> @@ -1158,7 +1184,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, false -> ok end, Consumer = #consumer{tag = ConsumerTag, - ack_required = not NoAck}, + ack_required = not NoAck, + args = OtherArgs}, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; true -> Holder end, @@ -1167,7 +1194,9 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ok = maybe_send_reply(ChPid, OkMsg), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck, qname(State1)), - AC1 = queue:in({ChPid, Consumer}, State1#q.active_consumers), + AC1 = priority_queue:in({ChPid, Consumer}, + consumer_priority({ChPid, Consumer}), + State1#q.active_consumers), reply(ok, run_message_queue(State1#q{active_consumers = AC1})) end; @@ -1198,6 +1227,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, active_consumers = remove_consumer( ChPid, ConsumerTag, State#q.active_consumers)}, + notify_federation(State1), case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); true -> stop(ok, State1) @@ -1374,6 +1404,10 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, end end); +handle_cast(notify_federation, State) -> + notify_federation(State), + noreply(State); + handle_cast(wake_up, State) -> noreply(State). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 22692dcb..da4d66c3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -734,7 +734,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_ack = NoAck, exclusive = ExclusiveConsume, nowait = NoWait, - arguments = Arguments}, + arguments = Args}, _, State = #ch{conn_pid = ConnPid, limiter = Limiter, consumer_mapping = ConsumerMapping}) -> @@ -755,12 +755,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin, case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> + {CreditArgs, OtherArgs} = parse_credit_args(Args), {rabbit_amqqueue:basic_consume( Q, NoAck, self(), rabbit_limiter:pid(Limiter), rabbit_limiter:is_active(Limiter), ActualConsumerTag, ExclusiveConsume, - parse_credit_args(Arguments), + CreditArgs, OtherArgs, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), Q} @@ -1224,12 +1225,12 @@ handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> parse_credit_args(Arguments) -> case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of - {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), - rabbit_misc:table_lookup(T, <<"drain">>)} of - {{long, Credit}, {boolean, Drain}} -> {Credit, Drain}; - _ -> none - end; - undefined -> none + {table, T} -> {case {rabbit_misc:table_lookup(T, <<"credit">>), + rabbit_misc:table_lookup(T, <<"drain">>)} of + {{long, Credit}, {bool, Drain}} -> {Credit, Drain}; + _ -> none + end, lists:keydelete(<<"x-credit">>, 1, Arguments)}; + undefined -> {none, Arguments} end. binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, |