diff options
-rw-r--r-- | src/rabbit.erl | 22 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 52 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 9 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 12 |
5 files changed, 60 insertions, 37 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 55736b28..4487f07c 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -355,6 +355,8 @@ handle_app_error(App, Reason) -> throw({could_not_start, App, Reason}). start_it(StartFun) -> + Marker = spawn_link(fun() -> receive stop -> ok end end), + register(rabbit_boot, Marker), try StartFun() catch @@ -363,11 +365,17 @@ start_it(StartFun) -> _:Reason -> boot_error(Reason, erlang:get_stacktrace()) after + unlink(Marker), + Marker ! stop, %% give the error loggers some time to catch up timer:sleep(100) end. stop() -> + case whereis(rabbit_boot) of + undefined -> ok; + _ -> await_startup() + end, rabbit_log:info("Stopping RabbitMQ~n"), ok = app_utils:stop_applications(app_shutdown_order()). @@ -719,13 +727,13 @@ erts_version_check() -> print_banner() -> {ok, Product} = application:get_key(id), {ok, Version} = application:get_key(vsn), - io:format("~n## ## ~s ~s. ~s" - "~n## ## ~s" - "~n##########" - "~n###### ## Logs: ~s" - "~n########## ~s" - "~n" - "~n Starting broker...", + io:format("~n ~s ~s. ~s" + "~n ## ## ~s" + "~n ## ##" + "~n ########## Logs: ~s" + "~n ###### ## ~s" + "~n ##########" + "~n Starting broker...", [Product, Version, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE, log_location(kernel), log_location(sasl)]). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fe3a6099..4a0ccf81 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -470,10 +470,9 @@ deliver_msg_to_consumer(DeliverFun, {Stop, State1}. deliver_from_queue_deliver(AckRequired, State) -> - {Result, State1} = fetch(AckRequired, State), - State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = - drop_expired_msgs(State1), - {Result, BQ:is_empty(BQS), State2}. + {Result, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} = + fetch(AckRequired, State), + {Result, BQ:is_empty(BQS), State1}. confirm_messages([], State) -> State; @@ -520,13 +519,11 @@ discard(#delivery{sender = SenderPid, BQS1 = BQ:discard(MsgId, SenderPid, BQS), State1#q{backing_queue_state = BQS1}. -run_message_queue(State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - drop_expired_msgs(State), - {_IsEmpty1, State2} = deliver_msgs_to_consumers( +run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + {_IsEmpty1, State1} = deliver_msgs_to_consumers( fun deliver_from_queue_deliver/2, - BQ:is_empty(BQS), State1), - State2. + BQ:is_empty(BQS), State), + State1. attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, @@ -560,20 +557,32 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, {false, State2 = #q{ttl = 0, dlx = undefined}} -> discard(Delivery, State2); {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> + IsEmpty = BQ:is_empty(BQS), BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), - ensure_ttl_timer(Props#message_properties.expiry, - State2#q{backing_queue_state = BQS1}) + State3 = State2#q{backing_queue_state = BQS1}, + %% optimisation: it would be perfectly safe to always + %% invoke drop_expired_msgs here, but that is expensive so + %% we only do that IFF the new message ends up at the head + %% of the queue (because the queue was empty) and has an + %% expiry. Only then may it need expiring straight away, + %% or, if expiry is not due yet, the expiry timer may need + %% (re)scheduling. + case {IsEmpty, Props#message_properties.expiry} of + {false, _} -> State3; + {true, undefined} -> State3; + {true, _} -> drop_expired_msgs(State3) + end end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - run_message_queue(State#q{backing_queue_state = BQS1}). + run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})). fetch(AckRequired, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), - {Result, State#q{backing_queue_state = BQS1}}. + {Result, drop_expired_msgs(State#q{backing_queue_state = BQS1})}. ack(AckTags, ChPid, State) -> subtract_acks(ChPid, AckTags, State, @@ -714,7 +723,13 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) -> drop_expired_msgs(State = #q{backing_queue_state = BQS, backing_queue = BQ }) -> - Now = now_micros(), + case BQ:is_empty(BQS) of + true -> State; + false -> drop_expired_msgs(now_micros(), State) + end. + +drop_expired_msgs(Now, State = #q{backing_queue_state = BQS, + backing_queue = BQ }) -> ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, {Props, State1} = with_dlx( @@ -1059,7 +1074,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), - case fetch(AckRequired, drop_expired_msgs(State1)) of + case fetch(AckRequired, State1) of {empty, State2} -> reply(empty, State2); {{Message, IsDelivered, AckTag}, State2} -> @@ -1132,7 +1147,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, handle_call(stat, _From, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - drop_expired_msgs(ensure_expiry_timer(State)), + ensure_expiry_timer(State), reply({ok, BQ:len(BQS), consumer_count()}, State1); handle_call({delete, IfUnused, IfEmpty}, From, @@ -1211,8 +1226,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - noreply(run_message_queue( - State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)})); + noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}); handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, State = #q{senders = Senders}) -> diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index c2b52a7c..2f247448 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -18,8 +18,6 @@ -ifdef(use_specs). --export_type([async_callback/0]). - %% We can't specify a per-queue ack/state with callback signatures -type(ack() :: any()). -type(state() :: any()). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e74211af..0510afa9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -589,6 +589,15 @@ handle_method(_Method, _, State = #ch{state = closing}) -> handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> {ok, State1} = notify_queues(State), + %% We issue the channel.close_ok response after a handshake with + %% the reader, the other half of which is ready_for_close. That + %% way the reader forgets about the channel before we send the + %% response (and this channel process terminates). If we didn't do + %% that, a channel.open for the same channel number, which a + %% client is entitled to send as soon as it has received the + %% close_ok, might be received by the reader before it has seen + %% the termination and hence be sent to the old, now dead/dying + %% channel process, instead of a new process, and thus lost. ReaderPid ! {channel_closing, self()}, {noreply, State1}; diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 5d463f57..f7c6c729 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -262,8 +262,6 @@ durable, transient_threshold, - async_callback, - len, persistent_count, @@ -356,8 +354,6 @@ durable :: boolean(), transient_threshold :: non_neg_integer(), - async_callback :: rabbit_backing_queue:async_callback(), - len :: non_neg_integer(), persistent_count :: non_neg_integer(), @@ -426,7 +422,7 @@ init(Queue, Recover, AsyncCallback) -> init(#amqqueue { name = QueueName, durable = IsDurable }, false, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), - init(IsDurable, IndexState, 0, [], AsyncCallback, + init(IsDurable, IndexState, 0, [], case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, MsgOnDiskFun, AsyncCallback); @@ -454,7 +450,7 @@ init(#amqqueue { name = QueueName, durable = true }, true, rabbit_msg_store:contains(MsgId, PersistentClient) end, MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, Terms1, AsyncCallback, + init(true, IndexState, DeltaCount, Terms1, PersistentClient, TransientClient). terminate(_Reason, State) -> @@ -1004,7 +1000,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, +init(IsDurable, IndexState, DeltaCount, Terms, PersistentClient, TransientClient) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), @@ -1030,8 +1026,6 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, durable = IsDurable, transient_threshold = NextSeqId, - async_callback = AsyncCallback, - len = DeltaCount1, persistent_count = DeltaCount1, |