diff options
author | Rob Harrop <rob@rabbitmq.com> | 2011-07-08 16:01:17 +0100 |
---|---|---|
committer | Rob Harrop <rob@rabbitmq.com> | 2011-07-08 16:01:17 +0100 |
commit | cbcc208629db5bac9d26862a481aa079d8e89478 (patch) | |
tree | a3dbb1ae0e569df76503b508f57e2bf981c42036 | |
parent | 8f136d8ead5b9a9ce9bea51eb1535dee065227ad (diff) | |
parent | 4d84dec33516e7a1b028d476c394047f7bbf7e57 (diff) | |
download | rabbitmq-server-cbcc208629db5bac9d26862a481aa079d8e89478.tar.gz |
Merge bug24216 into default
-rw-r--r-- | src/rabbit_binding.erl | 46 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 26 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 11 |
3 files changed, 44 insertions, 39 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 5873537c..205d5bba 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -94,8 +94,6 @@ routing_key, arguments]). recover(XNames, QNames) -> - XNameSet = sets:from_list(XNames), - QNameSet = sets:from_list(QNames), rabbit_misc:table_filter( fun (Route) -> mnesia:read({rabbit_semi_durable_route, Route}) =:= [] @@ -105,27 +103,33 @@ recover(XNames, QNames) -> (_Route, false) -> ok end, rabbit_durable_route), - rabbit_misc:table_filter( - fun (#route{binding = #binding{destination = Dst = - #resource{kind = Kind}}}) -> - sets:is_element(Dst, case Kind of - exchange -> XNameSet; - queue -> QNameSet - end) - end, - fun (R = #route{binding = B = #binding{source = Src}}, Tx) -> - {ok, X} = rabbit_exchange:lookup(Src), - Serial = case Tx of - true -> ok = sync_transient_route( - R, fun mnesia:write/3), - transaction; - false -> rabbit_exchange:serial(X) - end, - rabbit_exchange:callback(X, add_binding, [Serial, X, B]) - end, - rabbit_semi_durable_route), + XNameSet = sets:from_list(XNames), + QNameSet = sets:from_list(QNames), + SelectSet = fun (#resource{kind = exchange}) -> XNameSet; + (#resource{kind = queue}) -> QNameSet + end, + [recover_semi_durable_route(R, SelectSet(Dst)) || + R = #route{binding = #binding{destination = Dst}} <- + rabbit_misc:dirty_read_all(rabbit_semi_durable_route)], ok. +recover_semi_durable_route(R = #route{binding = B}, ToRecover) -> + #binding{source = Src, destination = Dst} = B, + {ok, X} = rabbit_exchange:lookup(Src), + rabbit_misc:execute_mnesia_transaction( + fun () -> + Rs = mnesia:match_object(rabbit_semi_durable_route, R, read), + case Rs =/= [] andalso sets:is_element(Dst, ToRecover) of + false -> no_recover; + true -> ok = sync_transient_route(R, fun mnesia:write/3), + rabbit_exchange:serial(X) + end + end, + fun (no_recover, _) -> ok; + (_Serial, true) -> x_callback(transaction, X, add_binding, B); + (Serial, false) -> x_callback(Serial, X, add_binding, B) + end). + exists(Binding) -> binding_action( Binding, fun (_Src, _Dst, B) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 84728980..8310bd8e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -287,12 +287,11 @@ handle_cast({deliver, ConsumerTag, AckRequired, exchange = ExchangeName#resource.name, routing_key = RoutingKey}, rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content), - - maybe_incr_stats([{QPid, 1}], - case AckRequired of - true -> deliver; - false -> deliver_no_ack - end, State), + maybe_incr_stats([{QPid, 1}], case AckRequired of + true -> deliver; + false -> deliver_no_ack + end, State), + maybe_incr_redeliver_stats(Redelivered, QPid, State), rabbit_trace:tap_trace_out(Msg, TraceState), noreply(State1#ch{next_tag = DeliveryTag + 1}); @@ -680,11 +679,11 @@ handle_method(#'basic.get'{queue = QueueNameBin, State1 = lock_message(not(NoAck), ack_record(DeliveryTag, none, Msg), State), - maybe_incr_stats([{QPid, 1}], - case NoAck of - true -> get_no_ack; - false -> get - end, State), + maybe_incr_stats([{QPid, 1}], case NoAck of + true -> get_no_ack; + false -> get + end, State), + maybe_incr_redeliver_stats(Redelivered, QPid, State), rabbit_trace:tap_trace_out(Msg, TraceState), ok = rabbit_writer:send_command( WriterPid, @@ -1449,6 +1448,11 @@ i(client_flow_blocked, #ch{limiter_pid = LimiterPid}) -> i(Item, _) -> throw({bad_argument, Item}). +maybe_incr_redeliver_stats(true, QPid, State) -> + maybe_incr_stats([{QPid, 1}], redeliver, State); +maybe_incr_redeliver_stats(_, _, _) -> + ok. + maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) -> case rabbit_event:stats_level(StatsTimer) of fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs]; diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index acf45bf3..8d26866b 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -16,7 +16,8 @@ -module(rabbit_upgrade_functions). --include("rabbit.hrl"). +%% If you are tempted to add include("rabbit.hrl"). here, don't. Using record +%% defs here leads to pain later. -compile([export_all]). @@ -190,11 +191,7 @@ create(Tab, TabDef) -> %% the exchange type registry or worker pool to be running by dint of %% not validating anything and assuming the exchange type does not %% require serialisation. +%% NB: this assumes the pre-exchange-scratch-space format declare_exchange(XName, Type) -> - X = #exchange{name = XName, - type = Type, - durable = true, - auto_delete = false, - internal = false, - arguments = []}, + X = {exchange, XName, Type, true, false, false, []}, ok = mnesia:dirty_write(rabbit_durable_exchange, X). |