summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-03-15 16:01:49 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-03-15 16:01:49 +0000
commit41f7641bd9d81ee6f45340dd6b5f6549e42e705d (patch)
treee476a7aaf9be1066076827c9078759396fba5f17
parentee334419d66b60de7c85bdf233ffc9042ed2df40 (diff)
parenta4956c5164e652c38a7ad1af3e89b1f7af931672 (diff)
downloadrabbitmq-server-41f7641bd9d81ee6f45340dd6b5f6549e42e705d.tar.gz
merge bug25488 into default
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_amqqueue.erl10
-rw-r--r--src/rabbit_amqqueue_process.erl65
-rw-r--r--src/rabbit_disk_monitor.erl7
-rw-r--r--src/rabbit_exchange.erl3
5 files changed, 67 insertions, 20 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index f3ba022a..3cfa21ba 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -236,7 +236,7 @@
{memory, any()}]).
-spec(is_running/0 :: () -> boolean()).
-spec(is_running/1 :: (node()) -> boolean()).
--spec(environment/0 :: () -> [{param() | term()}]).
+-spec(environment/0 :: () -> [{param(), term()}]).
-spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())).
-spec(force_event_refresh/0 :: () -> 'ok').
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ae7fe5c5..82ac74fa 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -407,7 +407,8 @@ args() ->
[{<<"x-expires">>, fun check_expires_arg/2},
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-exchange">>, fun check_string_arg/2},
- {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}].
+ {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
+ {<<"x-max-length">>, fun check_max_length_arg/2}].
check_string_arg({longstr, _}, _Args) -> ok;
check_string_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}.
@@ -418,6 +419,13 @@ check_int_arg({Type, _}, _) ->
false -> {error, {unacceptable_type, Type}}
end.
+check_max_length_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val >= 0 -> ok;
+ ok -> {error, {value_negative, Val}};
+ Error -> Error
+ end.
+
check_expires_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fba58d38..18b641d4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -55,6 +55,7 @@
queue_monitors,
dlx,
dlx_routing_key,
+ max_length,
status
}).
@@ -242,7 +243,8 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
[{<<"x-expires">>, fun init_expires/2},
{<<"x-dead-letter-exchange">>, fun init_dlx/2},
{<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2},
- {<<"x-message-ttl">>, fun init_ttl/2}]).
+ {<<"x-message-ttl">>, fun init_ttl/2},
+ {<<"x-max-length">>, fun init_max_length/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
@@ -254,6 +256,8 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
init_dlx_routing_key(RoutingKey, State) ->
State#q{dlx_routing_key = RoutingKey}.
+init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS} =
lists:foldl(fun (F, S) -> F(S) end, State,
@@ -557,27 +561,50 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- IsEmpty = BQ:is_empty(BQS),
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
- State3 = State2#q{backing_queue_state = BQS1},
+ {Dropped, State3 = #q{backing_queue_state = BQS2}} =
+ maybe_drop_head(State2#q{backing_queue_state = BQS1}),
+ QLen = BQ:len(BQS2),
%% optimisation: it would be perfectly safe to always
%% invoke drop_expired_msgs here, but that is expensive so
- %% we only do that IFF the new message ends up at the head
- %% of the queue (because the queue was empty) and has an
- %% expiry. Only then may it need expiring straight away,
- %% or, if expiry is not due yet, the expiry timer may need
- %% (re)scheduling.
- case {IsEmpty, Props#message_properties.expiry} of
- {false, _} -> State3;
- {true, undefined} -> State3;
- {true, _} -> drop_expired_msgs(State3)
+ %% we only do that if a new message that might have an
+ %% expiry ends up at the head of the queue. If the head
+ %% remains unchanged, or if the newly published message
+ %% has no expiry and becomes the head of the queue then
+ %% the call is unnecessary.
+ case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of
+ {false, false, _} -> State3;
+ {true, true, undefined} -> State3;
+ {_, _, _} -> drop_expired_msgs(State3)
end
end.
+maybe_drop_head(State = #q{max_length = undefined}) ->
+ {0, State};
+maybe_drop_head(State = #q{max_length = MaxLen,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ case BQ:len(BQS) - MaxLen of
+ Excess when Excess > 0 ->
+ {Excess,
+ with_dlx(
+ State#q.dlx,
+ fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end,
+ fun () ->
+ {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) ->
+ BQ:drop(false, BQS0)
+ end, {ok, BQS},
+ lists:seq(1, Excess)),
+ State#q{backing_queue_state = BQS1}
+ end)};
+ _ -> {0, State}
+ end.
+
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})).
+ {_Dropped, State1} = maybe_drop_head(State#q{backing_queue_state = BQS1}),
+ run_message_queue(drop_expired_msgs(State1)).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -761,6 +788,18 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
end, rejected, X, State),
State1.
+dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) ->
+ {ok, State1} =
+ dead_letter_msgs(
+ fun (DLFun, Acc, BQS) ->
+ lists:foldl(fun (_, {ok, Acc0, BQS0}) ->
+ {{Msg, _, AckTag}, BQS1} =
+ BQ:fetch(true, BQS0),
+ {ok, DLFun(Msg, AckTag, Acc0), BQS1}
+ end, {ok, Acc, BQS}, lists:seq(1, Excess))
+ end, maxlen, X, State),
+ State1.
+
dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
publish_seqno = SeqNo0,
unconfirmed = UC0,
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index b396b289..3bb163a1 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -31,6 +31,7 @@
-record(state, {dir,
limit,
+ actual,
timeout,
timer,
alarmed
@@ -106,8 +107,8 @@ handle_call({set_check_interval, Timeout}, _From, State) ->
{ok, cancel} = timer:cancel(State#state.timer),
{reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
-handle_call(get_disk_free, _From, State = #state { dir = Dir }) ->
- {reply, get_disk_free(Dir), State};
+handle_call(get_disk_free, _From, State = #state { actual = Actual }) ->
+ {reply, Actual, State};
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -156,7 +157,7 @@ internal_update(State = #state { limit = Limit,
_ ->
ok
end,
- State #state {alarmed = NewAlarmed}.
+ State #state {alarmed = NewAlarmed, actual = CurrentFreeBytes}.
get_disk_free(Dir) ->
get_disk_free(Dir, os:type()).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 2de7f8a4..27d60872 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -447,8 +447,7 @@ peek_serial(XName, LockType) ->
end.
invalid_module(T) ->
- rabbit_log:warning(
- "Could not find exchange type ~s.~n", [T]),
+ rabbit_log:warning("Could not find exchange type ~s.~n", [T]),
put({xtype_to_module, T}, rabbit_exchange_type_invalid),
rabbit_exchange_type_invalid.