summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-01-04 12:12:11 +0000
committerEmile Joubert <emile@rabbitmq.com>2013-01-04 12:12:11 +0000
commit3c29df8f13138df0a600ca5f642f0048784d6317 (patch)
tree9d5f531673399f0ef860b9aaf82a4dedea7964b0
parent3179a6148228279d5aa491cbec401d4976375179 (diff)
parent653d599979edb2f0a38c6916c2de49a793d4fb56 (diff)
downloadrabbitmq-server-3c29df8f13138df0a600ca5f642f0048784d6317.tar.gz
Merged default into bug19375
-rw-r--r--src/dtree.erl24
-rw-r--r--src/rabbit_amqqueue.erl10
-rw-r--r--src/rabbit_amqqueue_process.erl50
-rw-r--r--src/rabbit_channel.erl9
-rw-r--r--src/rabbit_misc.erl5
5 files changed, 85 insertions, 13 deletions
diff --git a/src/dtree.erl b/src/dtree.erl
index ca2d30cf..c59243bb 100644
--- a/src/dtree.erl
+++ b/src/dtree.erl
@@ -32,7 +32,7 @@
-module(dtree).
--export([empty/0, insert/4, take/3, take/2, take_all/2,
+-export([empty/0, insert/4, take/3, take/2, take_all/2, take_prim/2,
is_defined/2, is_empty/1, smallest/1, size/1]).
%%----------------------------------------------------------------------------
@@ -53,6 +53,7 @@
-spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
-spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
-spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
+-spec(take_prim/2 :: (pk(), ?MODULE()) -> {[kv()], ?MODULE()}).
-spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()).
-spec(is_empty/1 :: (?MODULE()) -> boolean()).
-spec(smallest/1 :: (?MODULE()) -> kv()).
@@ -120,6 +121,13 @@ take_all(SK, {P, S}) ->
{KVs, {P1, prune(SKS, PKS, S)}}
end.
+%% Drop the entry with the given primary key
+take_prim(PK, {P, S} = DTree) ->
+ case gb_trees:lookup(PK, P) of
+ none -> {[], DTree};
+ {value, {SKS, V}} -> {[{PK, V}], take_prim2(PK, SKS, DTree)}
+ end.
+
is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S).
is_empty({P, _S}) -> gb_trees:is_empty(P).
@@ -149,6 +157,20 @@ take_all2(PKS, P) ->
gb_trees:delete(PK, P0)}
end, {[], gb_sets:empty(), P}, PKS).
+take_prim2(PK, SKS, {P, S}) ->
+ {gb_trees:delete(PK, P),
+ rabbit_misc:gb_trees_fold(
+ fun (SK0, PKS, S1) ->
+ case gb_sets:is_member(SK0, SKS) of
+ false -> S1;
+ true -> PKS1 = gb_sets:delete(PK, PKS),
+ case gb_sets:is_empty(PKS1) of
+ true -> gb_trees:delete(SK0, S1);
+ false -> gb_trees:update(SK0, PKS1, S1)
+ end
+ end
+ end, S, S)}.
+
prune(SKS, PKS, S) ->
gb_sets:fold(fun (SK0, S0) ->
PKS1 = gb_trees:get(SK0, S0),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1a270364..dacf4f0a 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -389,7 +389,8 @@ check_declare_arguments(QueueName, Args) ->
Checks = [{<<"x-expires">>, fun check_expires_arg/2},
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-exchange">>, fun check_string_arg/2},
- {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}],
+ {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
+ {<<"x-maxdepth">>, fun check_maxdepth_arg/2}],
[case rabbit_misc:table_lookup(Args, Key) of
undefined -> ok;
TypeVal -> case Fun(TypeVal, Args) of
@@ -412,6 +413,13 @@ check_int_arg({Type, _}, _) ->
false -> {error, {unacceptable_type, Type}}
end.
+check_maxdepth_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val > 0 -> ok;
+ ok -> {error, {value_not_positive, Val}};
+ Error -> Error
+ end.
+
check_expires_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f9614517..f588c024 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -54,7 +54,8 @@
delayed_stop,
queue_monitors,
dlx,
- dlx_routing_key
+ dlx_routing_key,
+ max_depth
}).
-record(consumer, {tag, ack_required}).
@@ -134,6 +135,7 @@ init(Q) ->
senders = pmon:new(),
dlx = undefined,
dlx_routing_key = undefined,
+ max_depth = undefined,
publish_seqno = 1,
unconfirmed = dtree:empty(),
delayed_stop = undefined,
@@ -159,6 +161,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
rate_timer_ref = RateTRef,
expiry_timer_ref = undefined,
ttl = undefined,
+ max_depth = undefined,
senders = Senders,
publish_seqno = 1,
unconfirmed = dtree:empty(),
@@ -258,7 +261,8 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
[{<<"x-expires">>, fun init_expires/2},
{<<"x-dead-letter-exchange">>, fun init_dlx/2},
{<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2},
- {<<"x-message-ttl">>, fun init_ttl/2}]).
+ {<<"x-message-ttl">>, fun init_ttl/2},
+ {<<"x-maxdepth">>, fun init_maxdepth/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
@@ -270,6 +274,9 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
init_dlx_routing_key(RoutingKey, State) ->
State#q{dlx_routing_key = RoutingKey}.
+init_maxdepth(MaxDepth, State) ->
+ State#q{max_depth = MaxDepth}.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
@@ -553,7 +560,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
{false, State#q{backing_queue_state = BQS1}}
end.
-deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
+deliver_or_enqueue(Delivery = #delivery{message = Message},
Delivered, State) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State),
@@ -563,10 +570,37 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
%% The next one is an optimisation
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
- {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
- ensure_ttl_timer(Props#message_properties.expiry,
- State2#q{backing_queue_state = BQS1})
+ {false, State2} ->
+ case publish_max(Delivery, Props, Delivered, State2) of
+ nopub -> State2;
+ BQS1 -> ensure_ttl_timer(Props#message_properties.expiry,
+ State2#q{backing_queue_state = BQS1})
+ end
+ end.
+
+publish_max(#delivery{message = Message,
+ sender = SenderPid},
+ Props, Delivered, #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ max_depth = undefined}) ->
+ BQ:publish(Message, Props, Delivered, SenderPid, BQS);
+publish_max(#delivery{message = Message,
+ msg_seq_no = MsgSeqNo,
+ sender = SenderPid},
+ Props, Delivered, #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ max_depth = MaxDepth}) ->
+ case {BQ:depth(BQS) >= MaxDepth, BQ:len(BQS) =:= 0} of
+ {false, _} ->
+ BQ:publish(Message, Props, Delivered, SenderPid, BQS);
+ {true, true} ->
+ (dead_letter_fun(maxdepth))([{Message, undefined}]),
+ rabbit_misc:confirm_all(SenderPid, MsgSeqNo),
+ nopub;
+ {true, false} ->
+ {{Msg, _IsDelivered, AckTag}, BQS1} = BQ:fetch(true, BQS),
+ (dead_letter_fun(maxdepth))([{Msg, AckTag}]),
+ BQ:publish(Message, Props, Delivered, SenderPid, BQS1)
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
@@ -800,7 +834,7 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
unconfirmed = UC,
backing_queue = BQ,
backing_queue_state = BQS}) ->
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ {_Guids, BQS1} = BQ:ack([Ack || Ack <- AckTags, Ack /= undefined], BQS),
State1 = State#q{backing_queue_state = BQS1},
case dtree:is_empty(UC) andalso DS =/= undefined of
true -> case DS of
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 1af60de8..885452ce 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -228,8 +228,9 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
- {confirm, _MsgSeqNos, _QPid} -> 5;
- _ -> 0
+ {confirm, _MsgSeqNos, _QPid} -> 5;
+ {confirm_all, _MsgSeqNo, _QPid} -> 5;
+ _ -> 0
end.
prioritise_info(Msg, _State) ->
@@ -556,6 +557,10 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
{MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
record_confirms(MXs, State#ch{unconfirmed = UC1}).
+confirm_all(MsgSeqNo, State = #ch{unconfirmed = UC}) ->
+ {MXs, UC1} = dtree:take_prim(MsgSeqNo, UC),
+ record_confirms(MXs, State#ch{unconfirmed = UC1}).
+
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index edaa7198..c6c8676f 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -28,7 +28,7 @@
-export([enable_cover/0, report_cover/0]).
-export([enable_cover/1, report_cover/1]).
-export([start_cover/1]).
--export([confirm_to_sender/2]).
+-export([confirm_to_sender/2, confirm_all/2]).
-export([throw_on_error/2, with_exit_handler/2, is_abnormal_exit/1,
filter_exit_map/2]).
-export([with_user/2, with_user_and_vhost/3]).
@@ -427,6 +427,9 @@ report_coverage_percentage(File, Cov, NotCov, Mod) ->
confirm_to_sender(Pid, MsgSeqNos) ->
gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
+confirm_all(Pid, MsgSeqNo) ->
+ gen_server2:cast(Pid, {confirm_all, MsgSeqNo, self()}).
+
%% @doc Halts the emulator returning the given status code to the os.
%% On Windows this function will block indefinitely so as to give the io
%% subsystem time to flush stdout completely.