diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-06-12 16:36:18 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-06-12 16:36:18 +0100 |
commit | 8992b1b5a7dc9eca08501cf390f077b8e4b893ab (patch) | |
tree | 6d1287f5a021eebf4ac76ef6a5551d40cde57a63 /deps/rabbitmq_federation | |
parent | 177c8b18854f0f753caa4816a561e2fe8a50e59c (diff) | |
download | rabbitmq-server-git-8992b1b5a7dc9eca08501cf390f077b8e4b893ab.tar.gz |
Update headers when message forwarded.
Diffstat (limited to 'deps/rabbitmq_federation')
-rw-r--r-- | deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl | 56 | ||||
-rw-r--r-- | deps/rabbitmq_federation/src/rabbit_federation_upstream.erl | 7 |
2 files changed, 50 insertions, 13 deletions
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl b/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl index c5f0d614c7..ee1a4ed9b8 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl @@ -147,19 +147,19 @@ handle_info(#'basic.nack'{} = Nack, State = #state{ch = Ch, Unacked1 = rabbit_federation_link_util:nack(Nack, Ch, Unacked), {noreply, State#state{unacked = Unacked1}}; -handle_info({#'basic.deliver'{} = DeliverMethod, Msg}, - State = #state{queue = #amqqueue{name = QName}, - upstream = Upstream, - run = Run, - credit = Credit, - ctag = CTag, - ch = Ch, - dch = DCh, - unacked = Unacked}) -> +handle_info({#'basic.deliver'{redelivered = Redelivered} = DeliverMethod, Msg}, + State = #state{queue = #amqqueue{name = QName}, + upstream = Upstream, + upstream_params = UParams, + run = Run, + credit = Credit, + ctag = CTag, + ch = Ch, + dch = DCh, + unacked = Unacked}) -> PublishMethod = #'basic.publish'{exchange = <<"">>, routing_key = QName#resource.name}, - %% TODO Actually update the headers - HeadersFun = fun (H) -> H end, + HeadersFun = fun (H) -> update_headers(UParams, Redelivered, H) end, ForwardFun = fun (_H) -> true end, Unacked1 = rabbit_federation_link_util:forward( Upstream, DeliverMethod, Ch, DCh, PublishMethod, @@ -253,3 +253,37 @@ go(S0 = #not_started{run = Run, upstream_params = UParams, unacked = Unacked}} end, Upstream, UParams, QName, S0). + +update_headers(UParams, Redelivered, Headers) -> + {table, Info} = rabbit_federation_upstream:params_to_table(UParams), + {Headers1, Count} = + case rabbit_misc:table_lookup(Headers, ?ROUTING_HEADER) of + undefined -> + {Headers, 0}; + {array, Been} -> + {Found, Been1} = lists:partition( + fun (I) -> visit_match(I, Info) end, + Been), + C = case Found of + [] -> 0; + [{table, T}] -> case rabbit_misc:table_lookup( + T, <<"visit-count">>) of + {_, I} when is_number(I) -> I; + _ -> 0 + end + end, + {rabbit_misc:set_table_value( + Headers, ?ROUTING_HEADER, array, Been1), C} + end, + rabbit_basic:prepend_table_header( + ?ROUTING_HEADER, Info ++ [{<<"redelivered">>, bool, Redelivered}, + {<<"visit-count">>, long, Count + 1}], + Headers1). + +visit_match({table, T}, Info) -> + lists:all(fun (K) -> + rabbit_misc:table_lookup(T, K) =:= + rabbit_misc:table_lookup(Info, K) + end, [<<"uri">>, <<"virtual_host">>, <<"queue">>]); +visit_match(_ ,_) -> + false. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl index 4dd3a63655..fe2d3d197b 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl @@ -46,10 +46,13 @@ for(XorQ, UpstreamName) -> params_to_table(#upstream_params{uri = URI, params = Params, x_or_q = XorQ}) -> + Key = case XorQ of + #exchange{} -> <<"exchange">>; + #amqqueue{} -> <<"queue">> + end, {table, [{<<"uri">>, longstr, remove_credentials(URI)}, {<<"virtual_host">>, longstr, vhost(Params)}, - %% TODO derp - {<<"exchange">>, longstr, name(XorQ)}]}. + {Key, longstr, name(XorQ)}]}. params_to_string(#upstream_params{uri = URI, x_or_q = XorQ}) -> |