summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-04-12 11:08:48 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-04-12 11:08:48 +0100
commitfe05a3c993839b2f2eed8fa9ba54ec61530bedbb (patch)
treefc0b02f1b93c69788a21abac572520514f68dbdb
parent0a9e0d95837b1b895a8b39edfd990f99b6999837 (diff)
parent797c633e2a56ffed0ae338979471a97bc7f26bbb (diff)
downloadrabbitmq-server-fe05a3c993839b2f2eed8fa9ba54ec61530bedbb.tar.gz
Merged default
-rw-r--r--packaging/debs/Debian/debian/control2
-rw-r--r--packaging/debs/Debian/debian/postrm.in9
-rw-r--r--src/dtree.erl160
-rw-r--r--src/rabbit_amqqueue.erl23
-rw-r--r--src/rabbit_amqqueue_process.erl242
-rw-r--r--src/rabbit_basic.erl13
-rw-r--r--src/rabbit_channel.erl103
-rw-r--r--src/rabbit_exchange.erl5
-rw-r--r--src/rabbit_mirror_queue_master.erl9
-rw-r--r--src/rabbit_mirror_queue_slave.erl2
-rw-r--r--src/rabbit_misc.erl13
-rw-r--r--src/rabbit_tests.erl6
12 files changed, 334 insertions, 253 deletions
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index fb02cd6a..e935acf5 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -2,6 +2,8 @@ Source: rabbitmq-server
Section: net
Priority: extra
Maintainer: RabbitMQ Team <packaging@rabbitmq.com>
+Uploader: Emile Joubert <emile@rabbitmq.com>
+DM-Upload-Allowed: yes
Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:12.b.3), erlang-src (>= 1:12.b.3), unzip, zip
Standards-Version: 3.8.0
diff --git a/packaging/debs/Debian/debian/postrm.in b/packaging/debs/Debian/debian/postrm.in
index baf081fc..c2e9bbfe 100644
--- a/packaging/debs/Debian/debian/postrm.in
+++ b/packaging/debs/Debian/debian/postrm.in
@@ -35,20 +35,15 @@ case "$1" in
if [ -d /etc/rabbitmq ]; then
rm -r /etc/rabbitmq
fi
- remove_plugin_traces
+ remove_plugin_traces
if getent passwd rabbitmq >/dev/null; then
# Stop epmd if run by the rabbitmq user
pkill -u rabbitmq epmd || :
-
- deluser rabbitmq
- fi
- if getent group rabbitmq >/dev/null; then
- delgroup rabbitmq
fi
;;
remove|upgrade)
- remove_plugin_traces
+ remove_plugin_traces
;;
failed-upgrade|abort-install|abort-upgrade|disappear)
diff --git a/src/dtree.erl b/src/dtree.erl
new file mode 100644
index 00000000..67bbbc1b
--- /dev/null
+++ b/src/dtree.erl
@@ -0,0 +1,160 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+%% A dual-index tree.
+%%
+%% Entries have the following shape:
+%%
+%% +----+--------------------+---+
+%% | PK | SK1, SK2, ..., SKN | V |
+%% +----+--------------------+---+
+%%
+%% i.e. a primary key, set of secondary keys, and a value.
+%%
+%% There can be only one entry per primary key, but secondary keys may
+%% appear in multiple entries.
+%%
+%% The set of secondary keys must be non-empty. Or, to put it another
+%% way, entries only exist while their secondary key set is non-empty.
+
+-module(dtree).
+
+-export([empty/0, insert/4, take/3, take/2, take_all/2,
+ is_defined/2, is_empty/1, smallest/1, size/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-export_type([?MODULE/0]).
+
+-opaque(?MODULE() :: {gb_tree(), gb_tree()}).
+
+-type(pk() :: any()).
+-type(sk() :: any()).
+-type(val() :: any()).
+-type(kv() :: {pk(), val()}).
+
+-spec(empty/0 :: () -> ?MODULE()).
+-spec(insert/4 :: (pk(), [sk()], val(), ?MODULE()) -> ?MODULE()).
+-spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
+-spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
+-spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
+-spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()).
+-spec(is_empty/1 :: (?MODULE()) -> boolean()).
+-spec(smallest/1 :: (?MODULE()) -> kv()).
+-spec(size/1 :: (?MODULE()) -> non_neg_integer()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+empty() -> {gb_trees:empty(), gb_trees:empty()}.
+
+%% Insert an entry. Fails if there already is an entry with the given
+%% primary key.
+insert(PK, [], V, {P, S}) ->
+ %% dummy insert to force error if PK exists
+ gb_trees:insert(PK, {gb_sets:empty(), V}, P),
+ {P, S};
+insert(PK, SKs, V, {P, S}) ->
+ {gb_trees:insert(PK, {gb_sets:from_list(SKs), V}, P),
+ lists:foldl(fun (SK, S0) ->
+ case gb_trees:lookup(SK, S0) of
+ {value, PKS} -> PKS1 = gb_sets:insert(PK, PKS),
+ gb_trees:update(SK, PKS1, S0);
+ none -> PKS = gb_sets:singleton(PK),
+ gb_trees:insert(SK, PKS, S0)
+ end
+ end, S, SKs)}.
+
+%% Remove the given secondary key from the entries of the given
+%% primary keys, returning the primary-key/value pairs of any entries
+%% that were dropped as the result (i.e. due to their secondary key
+%% set becoming empty). It is ok for the given primary keys and/or
+%% secondary key to not exist.
+take(PKs, SK, {P, S}) ->
+ case gb_trees:lookup(SK, S) of
+ none -> {[], {P, S}};
+ {value, PKS} -> TakenPKS = gb_sets:from_list(PKs),
+ PKSInter = gb_sets:intersection(PKS, TakenPKS),
+ PKSDiff = gb_sets:difference (PKS, TakenPKS),
+ {KVs, P1} = take2(PKSInter, SK, P),
+ {KVs, {P1, case gb_sets:is_empty(PKSDiff) of
+ true -> gb_trees:delete(SK, S);
+ false -> gb_trees:update(SK, PKSDiff, S)
+ end}}
+ end.
+
+%% Remove the given secondary key from all entries, returning the
+%% primary-key/value pairs of any entries that were dropped as the
+%% result (i.e. due to their secondary key set becoming empty). It is
+%% ok for the given secondary key to not exist.
+take(SK, {P, S}) ->
+ case gb_trees:lookup(SK, S) of
+ none -> {[], {P, S}};
+ {value, PKS} -> {KVs, P1} = take2(PKS, SK, P),
+ {KVs, {P1, gb_trees:delete(SK, S)}}
+ end.
+
+%% Drop all entries which contain the given secondary key, returning
+%% the primary-key/value pairs of these entries. It is ok for the
+%% given secondary key to not exist.
+take_all(SK, {P, S}) ->
+ case gb_trees:lookup(SK, S) of
+ none -> {[], {P, S}};
+ {value, PKS} -> {KVs, SKS, P1} = take_all2(PKS, P),
+ {KVs, {P1, prune(SKS, PKS, S)}}
+ end.
+
+is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S).
+
+is_empty({P, _S}) -> gb_trees:is_empty(P).
+
+smallest({P, _S}) -> {K, {_SKS, V}} = gb_trees:smallest(P),
+ {K, V}.
+
+size({P, _S}) -> gb_trees:size(P).
+
+%%----------------------------------------------------------------------------
+
+take2(PKS, SK, P) ->
+ gb_sets:fold(fun (PK, {KVs, P0}) ->
+ {SKS, V} = gb_trees:get(PK, P0),
+ SKS1 = gb_sets:delete(SK, SKS),
+ case gb_sets:is_empty(SKS1) of
+ true -> KVs1 = [{PK, V} | KVs],
+ {KVs1, gb_trees:delete(PK, P0)};
+ false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)}
+ end
+ end, {[], P}, PKS).
+
+take_all2(PKS, P) ->
+ gb_sets:fold(fun (PK, {KVs, SKS0, P0}) ->
+ {SKS, V} = gb_trees:get(PK, P0),
+ {[{PK, V} | KVs], gb_sets:union(SKS, SKS0),
+ gb_trees:delete(PK, P0)}
+ end, {[], gb_sets:empty(), P}, PKS).
+
+prune(SKS, PKS, S) ->
+ gb_sets:fold(fun (SK0, S0) ->
+ PKS1 = gb_trees:get(SK0, S0),
+ PKS2 = gb_sets:difference(PKS1, PKS),
+ case gb_sets:is_empty(PKS2) of
+ true -> gb_trees:delete(SK0, S0);
+ false -> gb_trees:update(SK0, PKS2, S0)
+ end
+ end, S, SKS).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9ecbcbc3..75091692 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -331,7 +331,7 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
check_declare_arguments(QueueName, Args) ->
Checks = [{<<"x-expires">>, fun check_positive_int_arg/2},
- {<<"x-message-ttl">>, fun check_positive_int_arg/2},
+ {<<"x-message-ttl">>, fun check_non_neg_int_arg/2},
{<<"x-ha-policy">>, fun check_ha_policy_arg/2},
{<<"x-dead-letter-exchange">>, fun check_string_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}],
@@ -353,11 +353,24 @@ check_string_arg({longstr, _}, _Args) ->
check_string_arg({Type, _}, _) ->
{error, {unacceptable_type, Type}}.
-check_positive_int_arg({Type, Val}, _Args) ->
+check_int_arg({Type, _}, _) ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
- false -> {error, {unacceptable_type, Type}};
- true when Val =< 0 -> {error, {value_zero_or_less, Val}};
- true -> ok
+ true -> ok;
+ false -> {error, {unacceptable_type, Type}}
+ end.
+
+check_positive_int_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val > 0 -> ok;
+ ok -> {error, {value_zero_or_less, Val}};
+ Error -> Error
+ end.
+
+check_non_neg_int_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val >= 0 -> ok;
+ ok -> {error, {value_less_than_zero, Val}};
+ Error -> Error
end.
check_dlxrk_arg({longstr, _}, Args) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e1fd9bbc..3bd13df1 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -48,8 +48,7 @@
ttl,
ttl_timer_ref,
publish_seqno,
- unconfirmed_mq,
- unconfirmed_qm,
+ unconfirmed,
delayed_stop,
queue_monitors,
dlx,
@@ -135,8 +134,7 @@ init(Q) ->
dlx = undefined,
dlx_routing_key = undefined,
publish_seqno = 1,
- unconfirmed_mq = gb_trees:empty(),
- unconfirmed_qm = gb_trees:empty(),
+ unconfirmed = dtree:empty(),
delayed_stop = undefined,
queue_monitors = dict:new(),
msg_id_to_channel = gb_trees:empty()},
@@ -161,8 +159,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
expiry_timer_ref = undefined,
ttl = undefined,
publish_seqno = 1,
- unconfirmed_mq = gb_trees:empty(),
- unconfirmed_qm = gb_trees:empty(),
+ unconfirmed = dtree:empty(),
delayed_stop = undefined,
queue_monitors = dict:new(),
msg_id_to_channel = MTC},
@@ -490,8 +487,10 @@ should_confirm_message(#delivery{sender = SenderPid,
id = MsgId}},
#q{q = #amqqueue{durable = true}}) ->
{eventually, SenderPid, MsgSeqNo, MsgId};
-should_confirm_message(_Delivery, _State) ->
- immediately.
+should_confirm_message(#delivery{sender = SenderPid,
+ msg_seq_no = MsgSeqNo},
+ _State) ->
+ {immediately, SenderPid, MsgSeqNo}.
needs_confirming({eventually, _, _, _}) -> true;
needs_confirming(_) -> false.
@@ -500,6 +499,9 @@ maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId},
State = #q{msg_id_to_channel = MTC}) ->
State#q{msg_id_to_channel =
gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)};
+maybe_record_confirm_message({immediately, SenderPid, MsgSeqNo}, State) ->
+ rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
+ State;
maybe_record_confirm_message(_Confirm, State) ->
State.
@@ -511,52 +513,50 @@ run_message_queue(State) ->
BQ:is_empty(BQS), State1),
State2.
-attempt_delivery(Delivery = #delivery{sender = SenderPid,
- message = Message,
- msg_seq_no = MsgSeqNo},
+attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- Confirm = should_confirm_message(Delivery, State),
- case Confirm of
- immediately -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]);
- _ -> ok
- end,
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
- DeliverFun =
- fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
- Props = message_properties(Confirm, State1),
- {AckTag, BQS3} = BQ:publish_delivered(
- AckRequired, Message, Props,
- SenderPid, BQS2),
- {{Message, false, AckTag}, true,
- State1#q{backing_queue_state = BQS3}}
- end,
- {Delivered, State2} =
- deliver_msgs_to_consumers(DeliverFun, false,
- State#q{backing_queue_state = BQS1}),
- {Delivered, Confirm, State2};
+ deliver_msgs_to_consumers(
+ fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
+ Props = message_properties(Confirm, State1),
+ {AckTag, BQS3} = BQ:publish_delivered(
+ AckRequired, Message, Props,
+ SenderPid, BQS2),
+ {{Message, false, AckTag}, true,
+ State1#q{backing_queue_state = BQS3}}
+ end, false, State#q{backing_queue_state = BQS1});
{Duplicate, BQS1} ->
%% if the message has previously been seen by the BQ then
%% it must have been seen under the same circumstances as
%% now: i.e. if it is now a deliver_immediately then it
%% must have been before.
- Delivered = case Duplicate of
- published -> true;
- discarded -> false
- end,
- {Delivered, Confirm, State#q{backing_queue_state = BQS1}}
+ {case Duplicate of
+ published -> true;
+ discarded -> false
+ end,
+ State#q{backing_queue_state = BQS1}}
end.
-deliver_or_enqueue(Delivery = #delivery{message = Message,
- sender = SenderPid}, State) ->
- {Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
- State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- maybe_record_confirm_message(Confirm, State1),
- case Delivered of
- true -> State2;
- false -> Props = message_properties(Confirm, State),
- BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
- ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
+deliver_or_enqueue(Delivery = #delivery{message = Message,
+ msg_seq_no = MsgSeqNo,
+ sender = SenderPid}, State) ->
+ Confirm = should_confirm_message(Delivery, State),
+ case attempt_delivery(Delivery, Confirm, State) of
+ {true, State1} ->
+ maybe_record_confirm_message(Confirm, State1);
+ %% the next two are optimisations
+ {false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never ->
+ discard_delivery(Delivery, State1);
+ {false, State1 = #q{ttl = 0, dlx = undefined}} ->
+ rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
+ discard_delivery(Delivery, State1);
+ {false, State1} ->
+ State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ maybe_record_confirm_message(Confirm, State1),
+ Props = message_properties(Confirm, State2),
+ BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
+ ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
@@ -728,11 +728,9 @@ dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) ->
end.
dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
- State = #q{publish_seqno = MsgSeqNo,
- unconfirmed_mq = UMQ,
- dlx = DLX,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ State = #q{publish_seqno = MsgSeqNo,
+ unconfirmed = UC,
+ dlx = DLX}) ->
{ok, _, QPids} =
rabbit_basic:publish(
rabbit_basic:delivery(
@@ -741,20 +739,9 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
State1 = lists:foldl(fun monitor_queue/2, State, QPids),
State2 = State1#q{publish_seqno = MsgSeqNo + 1},
case QPids of
- [] -> {_Guids, BQS1} = BQ:ack([AckTag], BQS),
- cleanup_after_confirm(State2#q{backing_queue_state = BQS1});
- _ -> State3 =
- lists:foldl(
- fun(QPid, State0 = #q{unconfirmed_qm = UQM}) ->
- UQM1 = rabbit_misc:gb_trees_set_insert(
- QPid, MsgSeqNo, UQM),
- State0#q{unconfirmed_qm = UQM1}
- end, State2, QPids),
- noreply(State3#q{
- unconfirmed_mq =
- gb_trees:insert(
- MsgSeqNo, {gb_sets:from_list(QPids),
- AckTag}, UMQ)})
+ [] -> cleanup_after_confirm([AckTag], State2);
+ _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC),
+ noreply(State2#q{unconfirmed = UC1})
end.
monitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
@@ -773,64 +760,30 @@ demonitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
end.
handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
- unconfirmed_qm = UQM}) ->
+ unconfirmed = UC}) ->
case dict:find(QPid, QMons) of
error ->
noreply(State);
{ok, _} ->
- rabbit_log:info("DLQ ~p (for ~s) died~n",
- [QPid, rabbit_misc:rs(qname(State))]),
- State1 = State#q{queue_monitors = dict:erase(QPid, QMons)},
- case gb_trees:lookup(QPid, UQM) of
- none ->
- noreply(State1);
- {value, MsgSeqNosSet} ->
- case rabbit_misc:is_abnormal_termination(Reason) of
- true -> rabbit_log:warning(
- "Dead queue lost ~p messages~n",
- [gb_sets:size(MsgSeqNosSet)]);
- false -> ok
- end,
- handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid, State1)
- end
+ case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> {Lost, _UC1} = dtree:take_all(QPid, UC),
+ rabbit_log:warning(
+ "DLQ ~p for ~s died with ~p unconfirmed messages~n",
+ [QPid, rabbit_misc:rs(qname(State)), length(Lost)]);
+ false -> ok
+ end,
+ {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
+ cleanup_after_confirm(
+ [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
+ State#q{queue_monitors = dict:erase(QPid, QMons),
+ unconfirmed = UC1})
end.
-handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ,
- unconfirmed_qm = UQM,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {AckTags1, UMQ3} =
- lists:foldl(
- fun (MsgSeqNo, {AckTags, UMQ1}) ->
- {QPids, AckTag} = gb_trees:get(MsgSeqNo, UMQ1),
- QPids1 = gb_sets:delete(QPid, QPids),
- case gb_sets:is_empty(QPids1) of
- true -> {[AckTag | AckTags],
- gb_trees:delete(MsgSeqNo, UMQ1)};
- false -> {AckTags, gb_trees:update(
- MsgSeqNo, {QPids1, AckTag}, UMQ1)}
- end
- end, {[], UMQ}, MsgSeqNos),
- {_Guids, BQS1} = BQ:ack(AckTags1, BQS),
- MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM),
- gb_sets:from_list(MsgSeqNos)),
- State1 = case gb_sets:is_empty(MsgSeqNos1) of
- false -> State#q{
- unconfirmed_qm =
- gb_trees:update(QPid, MsgSeqNos1, UQM)};
- true -> demonitor_queue(
- QPid, State#q{
- unconfirmed_qm =
- gb_trees:delete(QPid, UQM)})
- end,
- cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3,
- backing_queue_state = BQS1}).
-
stop_later(Reason, State) ->
stop_later(Reason, undefined, noreply, State).
-stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) ->
- case {gb_trees:is_empty(UMQ), Reply} of
+stop_later(Reason, From, Reply, State = #q{unconfirmed = UC}) ->
+ case {dtree:is_empty(UC), Reply} of
{true, noreply} ->
{stop, Reason, State};
{true, _} ->
@@ -839,16 +792,20 @@ stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) ->
noreply(State#q{delayed_stop = {Reason, {From, Reply}}})
end.
-cleanup_after_confirm(State = #q{delayed_stop = DS,
- unconfirmed_mq = UMQ}) ->
- case gb_trees:is_empty(UMQ) andalso DS =/= undefined of
+cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
+ unconfirmed = UC,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State1 = State#q{backing_queue_state = BQS1},
+ case dtree:is_empty(UC) andalso DS =/= undefined of
true -> case DS of
{_, {_, noreply}} -> ok;
{_, {From, Reply}} -> gen_server2:reply(From, Reply)
end,
{Reason, _} = DS,
- {stop, Reason, State};
- false -> noreply(State)
+ {stop, Reason, State1};
+ false -> noreply(State1)
end.
already_been_here(_Delivery, #q{dlx = undefined}) ->
@@ -881,29 +838,33 @@ make_dead_letter_msg(DLX, Reason,
exchange_name = Exchange,
routing_keys = RoutingKeys},
State = #q{dlx_routing_key = DlxRoutingKey}) ->
- Headers = rabbit_basic:extract_headers(Content),
- #resource{name = QName} = qname(State),
- %% The first routing key is the one specified in the
- %% basic.publish; all others are CC or BCC keys.
- RoutingKeys1 = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
- Info = [{<<"reason">>, longstr, list_to_binary(atom_to_list(Reason))},
- {<<"queue">>, longstr, QName},
- {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000},
- {<<"exchange">>, longstr, Exchange#resource.name},
- {<<"routing-keys">>, array,
- [{longstr, Key} || Key <- RoutingKeys1]}],
- Headers1 = rabbit_basic:append_table_header(<<"x-death">>, Info, Headers),
- {DeathRoutingKeys, Headers2} =
+ {DeathRoutingKeys, HeadersFun1} =
case DlxRoutingKey of
- undefined -> {RoutingKeys, Headers1};
+ undefined -> {RoutingKeys, fun (H) -> H end};
_ -> {[DlxRoutingKey],
- lists:keydelete(<<"CC">>, 1, Headers1)}
+ fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
end,
- Content1 = rabbit_basic:replace_headers(Headers2, Content),
+ #resource{name = QName} = qname(State),
+ HeadersFun2 =
+ fun (Headers) ->
+ %% The first routing key is the one specified in the
+ %% basic.publish; all others are CC or BCC keys.
+ RoutingKeys1 =
+ [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
+ Info = [{<<"reason">>,
+ longstr, list_to_binary(atom_to_list(Reason))},
+ {<<"queue">>, longstr, QName},
+ {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000},
+ {<<"exchange">>, longstr, Exchange#resource.name},
+ {<<"routing-keys">>, array,
+ [{longstr, Key} || Key <- RoutingKeys1]}],
+ HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>,
+ Info, Headers))
+ end,
+ Content1 = rabbit_basic:map_headers(HeadersFun2, Content),
Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(),
routing_keys = DeathRoutingKeys, content = Content1}.
-
now_micros() -> timer:now_diff(now(), {0,0,0}).
infos(Items, State) ->
@@ -1094,7 +1055,8 @@ handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
+ Confirm = should_confirm_message(Delivery, State),
+ {Delivered, State1} = attempt_delivery(Delivery, Confirm, State),
reply(Delivered, case Delivered of
true -> maybe_record_confirm_message(Confirm, State1);
false -> discard_delivery(Delivery, State1)
@@ -1229,8 +1191,14 @@ handle_call(force_event_refresh, _From,
end,
reply(ok, State).
-handle_cast({confirm, MsgSeqNos, QPid}, State) ->
- handle_confirm(MsgSeqNos, QPid, State);
+handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
+ {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC),
+ State1 = case dtree:is_defined(QPid, UC1) of
+ false -> demonitor_queue(QPid, State);
+ true -> State
+ end,
+ cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
+ State1#q{unconfirmed = UC1});
handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
noreply(State);
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index a89aa074..8ad59016 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -20,7 +20,7 @@
-export([publish/4, publish/6, publish/1,
message/3, message/4, properties/1, append_table_header/3,
- extract_headers/1, replace_headers/2, delivery/4, header_routes/1]).
+ extract_headers/1, map_headers/2, delivery/4, header_routes/1]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -63,8 +63,8 @@
-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()).
--spec(replace_headers/2 :: (headers(), rabbit_types:content())
- -> rabbit_types:content()).
+-spec(map_headers/2 :: (rabbit_types:content(), fun((headers()) -> headers()))
+ -> rabbit_types:content()).
-spec(header_routes/1 ::
(undefined | rabbit_framing:amqp_table()) -> [string()]).
@@ -193,9 +193,12 @@ extract_headers(Content) ->
rabbit_binary_parser:ensure_content_decoded(Content),
Headers.
-replace_headers(Headers, Content = #content{properties = Props}) ->
+map_headers(F, Content) ->
+ Content1 = rabbit_binary_parser:ensure_content_decoded(Content),
+ #content{properties = #'P_basic'{headers = Headers} = Props} = Content1,
+ Headers1 = F(Headers),
rabbit_binary_generator:clear_encoded_content(
- Content#content{properties = Props#'P_basic'{headers = Headers}}).
+ Content1#content{properties = Props#'P_basic'{headers = Headers1}}).
indexof(L, Element) -> indexof(L, Element, 1).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index cac622f8..0c1c11d8 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -37,8 +37,8 @@
uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
virtual_host, most_recently_declared_queue, queue_monitors,
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
- stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
- unconfirmed_qm, confirmed, capabilities, trace_state}).
+ stats_timer, confirm_enabled, publish_seqno, unconfirmed,
+ confirmed, capabilities, trace_state}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -201,8 +201,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
queue_collector_pid = CollectorPid,
confirm_enabled = false,
publish_seqno = 1,
- unconfirmed_mq = gb_trees:empty(),
- unconfirmed_qm = gb_trees:empty(),
+ unconfirmed = dtree:empty(),
confirmed = [],
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost)},
@@ -548,45 +547,9 @@ record_confirms(MXs, State = #ch{confirmed = C}) ->
confirm([], _QPid, State) ->
State;
-confirm(MsgSeqNos, QPid, State) ->
- {MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State),
- record_confirms(MXs, State1).
-
-process_confirms(MsgSeqNos, QPid, Nack, State) ->
- lists:foldl(
- fun(MsgSeqNo, {_MXs, _State = #ch{unconfirmed_mq = UMQ0}} = Acc) ->
- case gb_trees:lookup(MsgSeqNo, UMQ0) of
- {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
- Acc, Nack);
- none -> Acc
- end
- end, {[], State}, MsgSeqNos).
-
-remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs},
- {MXs, State = #ch{unconfirmed_mq = UMQ,
- unconfirmed_qm = UQM}},
- Nack) ->
- State1 = case gb_trees:lookup(QPid, UQM) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
- case gb_sets:is_empty(MsgSeqNos1) of
- true -> UQM1 = gb_trees:delete(QPid, UQM),
- State#ch{unconfirmed_qm = UQM1};
- false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
- State#ch{unconfirmed_qm = UQM1}
- end;
- none ->
- State
- end,
- Qs1 = gb_sets:del_element(QPid, Qs),
- %% If QPid somehow died initiating a nack, clear the message from
- %% internal data-structures. Also, cleanup empty entries.
- case (Nack orelse gb_sets:is_empty(Qs1)) of
- true -> UMQ1 = gb_trees:delete(MsgSeqNo, UMQ),
- {[{MsgSeqNo, XName} | MXs], State1#ch{unconfirmed_mq = UMQ1}};
- false -> UMQ1 = gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ),
- {MXs, State1#ch{unconfirmed_mq = UMQ1}}
- end.
+confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
+ {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
+ record_confirms(MXs, State#ch{unconfirmed = UC1}).
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -1152,22 +1115,13 @@ monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
true -> State
end.
-handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
- MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
- {value, MsgSet} -> gb_sets:to_list(MsgSet);
- none -> []
- end,
- %% We remove the MsgSeqNos from UQM before calling
- %% process_confirms to prevent each MsgSeqNo being removed from
- %% the set one by one which which would be inefficient
- State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
- {Nack, SendFun} =
- case rabbit_misc:is_abnormal_termination(Reason) of
- true -> {true, fun send_nacks/2};
- false -> {false, fun record_confirms/2}
- end,
- {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
- SendFun(MXs, State2).
+handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
+ case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> {MXs, UC1} = dtree:take_all(QPid, UC),
+ send_nacks(MXs, State#ch{unconfirmed = UC1});
+ false -> {MXs, UC1} = dtree:take(QPid, UC),
+ record_confirms(MXs, State#ch{unconfirmed = UC1})
+ end.
handle_consuming_queue_down(QPid,
State = #ch{consumer_mapping = ConsumerMapping,
@@ -1392,21 +1346,8 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
process_routing_result(routed, _, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
- #ch{unconfirmed_mq = UMQ} = State,
- UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ),
- SingletonSet = gb_sets:singleton(MsgSeqNo),
- lists:foldl(
- fun (QPid, State0 = #ch{unconfirmed_qm = UQM}) ->
- case gb_trees:lookup(QPid, UQM) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
- UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
- State0#ch{unconfirmed_qm = UQM1};
- none ->
- UQM1 = gb_trees:insert(QPid, SingletonSet, UQM),
- State0#ch{unconfirmed_qm = UQM1}
- end
- end, State#ch{unconfirmed_mq = UMQ1}, QPids).
+ State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
+ State#ch.unconfirmed)}.
send_nacks([], State) ->
State;
@@ -1444,11 +1385,11 @@ send_confirms(Cs, State) ->
end, State).
coalesce_and_send(MsgSeqNos, MkMsgFun,
- State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) ->
+ State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
- CutOff = case gb_trees:is_empty(UMQ) of
+ CutOff = case dtree:is_empty(UC) of
true -> lists:last(SMsgSeqNos) + 1;
- false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo
+ false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo
end,
{Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
case Ms of
@@ -1462,8 +1403,8 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
maybe_complete_tx(State = #ch{tx_status = in_progress}) ->
State;
-maybe_complete_tx(State = #ch{unconfirmed_mq = UMQ}) ->
- case gb_trees:is_empty(UMQ) of
+maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
+ case dtree:is_empty(UC) of
false -> State;
true -> complete_tx(State#ch{confirmed = []})
end.
@@ -1491,8 +1432,8 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
-i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) ->
- gb_trees:size(UMQ);
+i(messages_unconfirmed, #ch{unconfirmed = UC}) ->
+ dtree:size(UC);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
queue:len(UAMQ);
i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 83e28c44..910a89b4 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -242,6 +242,11 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
+%% Optimisation
+route(#exchange{name = #resource{name = <<"">>, virtual_host = VHost}},
+ #delivery{message = #basic_message{routing_keys = RKs}}) ->
+ [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
+
route(X = #exchange{name = XName}, Delivery) ->
route1(Delivery, {queue:from_list([X]), XName, []}).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 764c3764..04b7514f 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -173,11 +173,12 @@ dropwhile(Pred, MsgFun,
backing_queue = BQ,
set_delivered = SetDelivered,
backing_queue_state = BQS }) ->
- Len = BQ:len(BQS),
+ Len = BQ:len(BQS),
BQS1 = BQ:dropwhile(Pred, MsgFun, BQS),
- Dropped = Len - BQ:len(BQS1),
+ Len1 = BQ:len(BQS1),
+ ok = gm:broadcast(GM, {set_length, Len1}),
+ Dropped = Len - Len1,
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}),
State #state { backing_queue_state = BQS1,
set_delivered = SetDelivered1 }.
@@ -237,11 +238,11 @@ ack(AckTags, State = #state { gm = GM,
backing_queue_state = BQS,
ack_msg_id = AM }) ->
{MsgIds, BQS1} = BQ:ack(AckTags, BQS),
- AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
case MsgIds of
[] -> ok;
_ -> ok = gm:broadcast(GM, {ack, MsgIds})
end,
+ AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 98a80a26..0a51bcaa 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -846,7 +846,7 @@ process_instruction({ack, MsgIds},
process_instruction({fold, MsgFun, AckTags},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
- BQS1 = BQ:fold(AckTags, MsgFun, BQS),
+ BQS1 = BQ:fold(MsgFun, BQS, AckTags),
{ok, State #state { backing_queue_state = BQS1 }};
process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 39409d2f..c1be7613 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -46,8 +46,7 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
--export([dict_cons/3, orddict_cons/3, gb_trees_cons/3,
- gb_trees_set_insert/3]).
+-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
-export([gb_trees_fold/3, gb_trees_foreach/2]).
-export([get_options/2]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
@@ -177,7 +176,6 @@
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
-spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()).
--spec(gb_trees_set_insert/3 :: (any(), any(), gb_tree()) -> gb_tree()).
-spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A).
-spec(gb_trees_foreach/2 ::
(fun ((any(), any()) -> any()), gb_tree()) -> 'ok').
@@ -719,15 +717,6 @@ gb_trees_cons(Key, Value, Tree) ->
none -> gb_trees:insert(Key, [Value], Tree)
end.
-gb_trees_set_insert(Key, Value, Tree) ->
- case gb_trees:lookup(Key, Tree) of
- {value, Values} ->
- Values1 = gb_sets:insert(Value, Values),
- gb_trees:update(Key, Values1, Tree);
- none ->
- gb_trees:insert(Key, gb_sets:singleton(Value), Tree)
- end.
-
gb_trees_fold(Fun, Acc, Tree) ->
gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 55e4a6f8..e356d7ff 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -883,6 +883,8 @@ test_cluster_management() ->
"invalid2@invalid"]),
ok = assert_ram_node(),
+ ok = control_action(reset, []),
+
SecondaryNode = rabbit_nodes:make("hare"),
case net_adm:ping(SecondaryNode) of
pong -> passed = test_cluster_management2(SecondaryNode);
@@ -898,7 +900,6 @@ test_cluster_management2(SecondaryNode) ->
SecondaryNodeS = atom_to_list(SecondaryNode),
%% make a disk node
- ok = control_action(reset, []),
ok = control_action(cluster, [NodeS]),
ok = assert_disc_node(),
%% make a ram node
@@ -1244,6 +1245,9 @@ test_confirms() ->
},
rabbit_basic:build_content(
#'P_basic'{delivery_mode = 2}, <<"">>)),
+ %% We must not kill the queue before the channel has processed the
+ %% 'publish'.
+ ok = rabbit_channel:flush(Ch),
%% Crash the queue
QPid1 ! boom,
%% Wait for a nack