summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit.erl22
-rw-r--r--src/rabbit_amqqueue_process.erl52
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_channel.erl9
-rw-r--r--src/rabbit_variable_queue.erl12
5 files changed, 60 insertions, 37 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 55736b28..4487f07c 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -355,6 +355,8 @@ handle_app_error(App, Reason) ->
throw({could_not_start, App, Reason}).
start_it(StartFun) ->
+ Marker = spawn_link(fun() -> receive stop -> ok end end),
+ register(rabbit_boot, Marker),
try
StartFun()
catch
@@ -363,11 +365,17 @@ start_it(StartFun) ->
_:Reason ->
boot_error(Reason, erlang:get_stacktrace())
after
+ unlink(Marker),
+ Marker ! stop,
%% give the error loggers some time to catch up
timer:sleep(100)
end.
stop() ->
+ case whereis(rabbit_boot) of
+ undefined -> ok;
+ _ -> await_startup()
+ end,
rabbit_log:info("Stopping RabbitMQ~n"),
ok = app_utils:stop_applications(app_shutdown_order()).
@@ -719,13 +727,13 @@ erts_version_check() ->
print_banner() ->
{ok, Product} = application:get_key(id),
{ok, Version} = application:get_key(vsn),
- io:format("~n## ## ~s ~s. ~s"
- "~n## ## ~s"
- "~n##########"
- "~n###### ## Logs: ~s"
- "~n########## ~s"
- "~n"
- "~n Starting broker...",
+ io:format("~n ~s ~s. ~s"
+ "~n ## ## ~s"
+ "~n ## ##"
+ "~n ########## Logs: ~s"
+ "~n ###### ## ~s"
+ "~n ##########"
+ "~n Starting broker...",
[Product, Version, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE,
log_location(kernel), log_location(sasl)]).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fe3a6099..4a0ccf81 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -470,10 +470,9 @@ deliver_msg_to_consumer(DeliverFun,
{Stop, State1}.
deliver_from_queue_deliver(AckRequired, State) ->
- {Result, State1} = fetch(AckRequired, State),
- State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(State1),
- {Result, BQ:is_empty(BQS), State2}.
+ {Result, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} =
+ fetch(AckRequired, State),
+ {Result, BQ:is_empty(BQS), State1}.
confirm_messages([], State) ->
State;
@@ -520,13 +519,11 @@ discard(#delivery{sender = SenderPid,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
State1#q{backing_queue_state = BQS1}.
-run_message_queue(State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(State),
- {_IsEmpty1, State2} = deliver_msgs_to_consumers(
+run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ {_IsEmpty1, State1} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
- BQ:is_empty(BQS), State1),
- State2.
+ BQ:is_empty(BQS), State),
+ State1.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
@@ -560,20 +557,32 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
+ IsEmpty = BQ:is_empty(BQS),
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
- ensure_ttl_timer(Props#message_properties.expiry,
- State2#q{backing_queue_state = BQS1})
+ State3 = State2#q{backing_queue_state = BQS1},
+ %% optimisation: it would be perfectly safe to always
+ %% invoke drop_expired_msgs here, but that is expensive so
+ %% we only do that IFF the new message ends up at the head
+ %% of the queue (because the queue was empty) and has an
+ %% expiry. Only then may it need expiring straight away,
+ %% or, if expiry is not due yet, the expiry timer may need
+ %% (re)scheduling.
+ case {IsEmpty, Props#message_properties.expiry} of
+ {false, _} -> State3;
+ {true, undefined} -> State3;
+ {true, _} -> drop_expired_msgs(State3)
+ end
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- run_message_queue(State#q{backing_queue_state = BQS1}).
+ run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
- {Result, State#q{backing_queue_state = BQS1}}.
+ {Result, drop_expired_msgs(State#q{backing_queue_state = BQS1})}.
ack(AckTags, ChPid, State) ->
subtract_acks(ChPid, AckTags, State,
@@ -714,7 +723,13 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
drop_expired_msgs(State = #q{backing_queue_state = BQS,
backing_queue = BQ }) ->
- Now = now_micros(),
+ case BQ:is_empty(BQS) of
+ true -> State;
+ false -> drop_expired_msgs(now_micros(), State)
+ end.
+
+drop_expired_msgs(Now, State = #q{backing_queue_state = BQS,
+ backing_queue = BQ }) ->
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
{Props, State1} =
with_dlx(
@@ -1059,7 +1074,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case fetch(AckRequired, drop_expired_msgs(State1)) of
+ case fetch(AckRequired, State1) of
{empty, State2} ->
reply(empty, State2);
{{Message, IsDelivered, AckTag}, State2} ->
@@ -1132,7 +1147,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
handle_call(stat, _From, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(ensure_expiry_timer(State)),
+ ensure_expiry_timer(State),
reply({ok, BQ:len(BQS), consumer_count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, From,
@@ -1211,8 +1226,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- noreply(run_message_queue(
- State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}));
+ noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
State = #q{senders = Senders}) ->
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index c2b52a7c..2f247448 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -18,8 +18,6 @@
-ifdef(use_specs).
--export_type([async_callback/0]).
-
%% We can't specify a per-queue ack/state with callback signatures
-type(ack() :: any()).
-type(state() :: any()).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e74211af..0510afa9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -589,6 +589,15 @@ handle_method(_Method, _, State = #ch{state = closing}) ->
handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
{ok, State1} = notify_queues(State),
+ %% We issue the channel.close_ok response after a handshake with
+ %% the reader, the other half of which is ready_for_close. That
+ %% way the reader forgets about the channel before we send the
+ %% response (and this channel process terminates). If we didn't do
+ %% that, a channel.open for the same channel number, which a
+ %% client is entitled to send as soon as it has received the
+ %% close_ok, might be received by the reader before it has seen
+ %% the termination and hence be sent to the old, now dead/dying
+ %% channel process, instead of a new process, and thus lost.
ReaderPid ! {channel_closing, self()},
{noreply, State1};
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 5d463f57..f7c6c729 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -262,8 +262,6 @@
durable,
transient_threshold,
- async_callback,
-
len,
persistent_count,
@@ -356,8 +354,6 @@
durable :: boolean(),
transient_threshold :: non_neg_integer(),
- async_callback :: rabbit_backing_queue:async_callback(),
-
len :: non_neg_integer(),
persistent_count :: non_neg_integer(),
@@ -426,7 +422,7 @@ init(Queue, Recover, AsyncCallback) ->
init(#amqqueue { name = QueueName, durable = IsDurable }, false,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
- init(IsDurable, IndexState, 0, [], AsyncCallback,
+ init(IsDurable, IndexState, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
MsgOnDiskFun, AsyncCallback);
@@ -454,7 +450,7 @@ init(#amqqueue { name = QueueName, durable = true }, true,
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, Terms1, AsyncCallback,
+ init(true, IndexState, DeltaCount, Terms1,
PersistentClient, TransientClient).
terminate(_Reason, State) ->
@@ -1004,7 +1000,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
+init(IsDurable, IndexState, DeltaCount, Terms,
PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
@@ -1030,8 +1026,6 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
durable = IsDurable,
transient_threshold = NextSeqId,
- async_callback = AsyncCallback,
-
len = DeltaCount1,
persistent_count = DeltaCount1,