diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 62 |
1 files changed, 23 insertions, 39 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 854ba640..64d55684 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -122,27 +122,7 @@ info_keys() -> ?INFO_KEYS. init(Q) -> process_flag(trap_exit, true), - State = #q{q = Q#amqqueue{pid = self()}, - exclusive_consumer = none, - has_had_consumers = false, - backing_queue = undefined, - backing_queue_state = undefined, - active_consumers = queue:new(), - expires = undefined, - sync_timer_ref = undefined, - rate_timer_ref = undefined, - expiry_timer_ref = undefined, - ttl = undefined, - senders = pmon:new(), - dlx = undefined, - dlx_routing_key = undefined, - publish_seqno = 1, - unconfirmed = dtree:empty(), - delayed_stop = undefined, - queue_monitors = pmon:new(), - msg_id_to_channel = gb_trees:empty(), - status = running}, - {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, + {ok, init_state(Q#amqqueue{pid = self()}), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, @@ -151,28 +131,29 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, none -> ok; _ -> erlang:monitor(process, Owner) end, + State = init_state(Q), + State1 = State#q{backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = RateTRef, + senders = Senders, + msg_id_to_channel = MTC}, + State2 = process_args(State1), + lists:foldl(fun (Delivery, StateN) -> + deliver_or_enqueue(Delivery, true, StateN) + end, State2, Deliveries). + +init_state(Q) -> State = #q{q = Q, exclusive_consumer = none, has_had_consumers = false, - backing_queue = BQ, - backing_queue_state = BQS, active_consumers = queue:new(), - expires = undefined, - sync_timer_ref = undefined, - rate_timer_ref = RateTRef, - expiry_timer_ref = undefined, - ttl = undefined, - senders = Senders, + senders = pmon:new(), publish_seqno = 1, unconfirmed = dtree:empty(), - delayed_stop = undefined, queue_monitors = pmon:new(), - msg_id_to_channel = MTC, + msg_id_to_channel = gb_trees:empty(), status = running}, - State1 = process_args(rabbit_event:init_stats_timer(State, #q.stats_timer)), - lists:foldl(fun (Delivery, StateN) -> - deliver_or_enqueue(Delivery, true, StateN) - end, State1, Deliveries). + rabbit_event:init_stats_timer(State, #q.stats_timer). terminate(shutdown = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); @@ -520,11 +501,14 @@ send_or_record_confirm(#delivery{sender = SenderPid, rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), {immediately, State}. -discard(#delivery{sender = SenderPid, message = #basic_message{id = MsgId}}, - State) -> - %% fake an 'eventual' confirm from BQ; noop if not needed +discard(#delivery{sender = SenderPid, + msg_seq_no = MsgSeqNo, + message = #basic_message{id = MsgId}}, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - confirm_messages([MsgId], State), + case MsgSeqNo of + undefined -> State; + _ -> confirm_messages([MsgId], State) + end, BQS1 = BQ:discard(MsgId, SenderPid, BQS), State1#q{backing_queue_state = BQS1}. |