summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2011-07-08 16:01:17 +0100
committerRob Harrop <rob@rabbitmq.com>2011-07-08 16:01:17 +0100
commitcbcc208629db5bac9d26862a481aa079d8e89478 (patch)
treea3dbb1ae0e569df76503b508f57e2bf981c42036
parent8f136d8ead5b9a9ce9bea51eb1535dee065227ad (diff)
parent4d84dec33516e7a1b028d476c394047f7bbf7e57 (diff)
downloadrabbitmq-server-cbcc208629db5bac9d26862a481aa079d8e89478.tar.gz
Merge bug24216 into default
-rw-r--r--src/rabbit_binding.erl46
-rw-r--r--src/rabbit_channel.erl26
-rw-r--r--src/rabbit_upgrade_functions.erl11
3 files changed, 44 insertions, 39 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 5873537c..205d5bba 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -94,8 +94,6 @@
routing_key, arguments]).
recover(XNames, QNames) ->
- XNameSet = sets:from_list(XNames),
- QNameSet = sets:from_list(QNames),
rabbit_misc:table_filter(
fun (Route) ->
mnesia:read({rabbit_semi_durable_route, Route}) =:= []
@@ -105,27 +103,33 @@ recover(XNames, QNames) ->
(_Route, false) ->
ok
end, rabbit_durable_route),
- rabbit_misc:table_filter(
- fun (#route{binding = #binding{destination = Dst =
- #resource{kind = Kind}}}) ->
- sets:is_element(Dst, case Kind of
- exchange -> XNameSet;
- queue -> QNameSet
- end)
- end,
- fun (R = #route{binding = B = #binding{source = Src}}, Tx) ->
- {ok, X} = rabbit_exchange:lookup(Src),
- Serial = case Tx of
- true -> ok = sync_transient_route(
- R, fun mnesia:write/3),
- transaction;
- false -> rabbit_exchange:serial(X)
- end,
- rabbit_exchange:callback(X, add_binding, [Serial, X, B])
- end,
- rabbit_semi_durable_route),
+ XNameSet = sets:from_list(XNames),
+ QNameSet = sets:from_list(QNames),
+ SelectSet = fun (#resource{kind = exchange}) -> XNameSet;
+ (#resource{kind = queue}) -> QNameSet
+ end,
+ [recover_semi_durable_route(R, SelectSet(Dst)) ||
+ R = #route{binding = #binding{destination = Dst}} <-
+ rabbit_misc:dirty_read_all(rabbit_semi_durable_route)],
ok.
+recover_semi_durable_route(R = #route{binding = B}, ToRecover) ->
+ #binding{source = Src, destination = Dst} = B,
+ {ok, X} = rabbit_exchange:lookup(Src),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ Rs = mnesia:match_object(rabbit_semi_durable_route, R, read),
+ case Rs =/= [] andalso sets:is_element(Dst, ToRecover) of
+ false -> no_recover;
+ true -> ok = sync_transient_route(R, fun mnesia:write/3),
+ rabbit_exchange:serial(X)
+ end
+ end,
+ fun (no_recover, _) -> ok;
+ (_Serial, true) -> x_callback(transaction, X, add_binding, B);
+ (Serial, false) -> x_callback(Serial, X, add_binding, B)
+ end).
+
exists(Binding) ->
binding_action(
Binding, fun (_Src, _Dst, B) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 84728980..8310bd8e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -287,12 +287,11 @@ handle_cast({deliver, ConsumerTag, AckRequired,
exchange = ExchangeName#resource.name,
routing_key = RoutingKey},
rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
-
- maybe_incr_stats([{QPid, 1}],
- case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State),
+ maybe_incr_stats([{QPid, 1}], case AckRequired of
+ true -> deliver;
+ false -> deliver_no_ack
+ end, State),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State),
rabbit_trace:tap_trace_out(Msg, TraceState),
noreply(State1#ch{next_tag = DeliveryTag + 1});
@@ -680,11 +679,11 @@ handle_method(#'basic.get'{queue = QueueNameBin,
State1 = lock_message(not(NoAck),
ack_record(DeliveryTag, none, Msg),
State),
- maybe_incr_stats([{QPid, 1}],
- case NoAck of
- true -> get_no_ack;
- false -> get
- end, State),
+ maybe_incr_stats([{QPid, 1}], case NoAck of
+ true -> get_no_ack;
+ false -> get
+ end, State),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State),
rabbit_trace:tap_trace_out(Msg, TraceState),
ok = rabbit_writer:send_command(
WriterPid,
@@ -1449,6 +1448,11 @@ i(client_flow_blocked, #ch{limiter_pid = LimiterPid}) ->
i(Item, _) ->
throw({bad_argument, Item}).
+maybe_incr_redeliver_stats(true, QPid, State) ->
+ maybe_incr_stats([{QPid, 1}], redeliver, State);
+maybe_incr_redeliver_stats(_, _, _) ->
+ ok.
+
maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) ->
case rabbit_event:stats_level(StatsTimer) of
fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index acf45bf3..8d26866b 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -16,7 +16,8 @@
-module(rabbit_upgrade_functions).
--include("rabbit.hrl").
+%% If you are tempted to add include("rabbit.hrl"). here, don't. Using record
+%% defs here leads to pain later.
-compile([export_all]).
@@ -190,11 +191,7 @@ create(Tab, TabDef) ->
%% the exchange type registry or worker pool to be running by dint of
%% not validating anything and assuming the exchange type does not
%% require serialisation.
+%% NB: this assumes the pre-exchange-scratch-space format
declare_exchange(XName, Type) ->
- X = #exchange{name = XName,
- type = Type,
- durable = true,
- auto_delete = false,
- internal = false,
- arguments = []},
+ X = {exchange, XName, Type, true, false, false, []},
ok = mnesia:dirty_write(rabbit_durable_exchange, X).