summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_federation
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-06-12 16:36:18 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-06-12 16:36:18 +0100
commit8992b1b5a7dc9eca08501cf390f077b8e4b893ab (patch)
tree6d1287f5a021eebf4ac76ef6a5551d40cde57a63 /deps/rabbitmq_federation
parent177c8b18854f0f753caa4816a561e2fe8a50e59c (diff)
downloadrabbitmq-server-git-8992b1b5a7dc9eca08501cf390f077b8e4b893ab.tar.gz
Update headers when message forwarded.
Diffstat (limited to 'deps/rabbitmq_federation')
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl56
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_upstream.erl7
2 files changed, 50 insertions, 13 deletions
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl b/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl
index c5f0d614c7..ee1a4ed9b8 100644
--- a/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl
+++ b/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl
@@ -147,19 +147,19 @@ handle_info(#'basic.nack'{} = Nack, State = #state{ch = Ch,
Unacked1 = rabbit_federation_link_util:nack(Nack, Ch, Unacked),
{noreply, State#state{unacked = Unacked1}};
-handle_info({#'basic.deliver'{} = DeliverMethod, Msg},
- State = #state{queue = #amqqueue{name = QName},
- upstream = Upstream,
- run = Run,
- credit = Credit,
- ctag = CTag,
- ch = Ch,
- dch = DCh,
- unacked = Unacked}) ->
+handle_info({#'basic.deliver'{redelivered = Redelivered} = DeliverMethod, Msg},
+ State = #state{queue = #amqqueue{name = QName},
+ upstream = Upstream,
+ upstream_params = UParams,
+ run = Run,
+ credit = Credit,
+ ctag = CTag,
+ ch = Ch,
+ dch = DCh,
+ unacked = Unacked}) ->
PublishMethod = #'basic.publish'{exchange = <<"">>,
routing_key = QName#resource.name},
- %% TODO Actually update the headers
- HeadersFun = fun (H) -> H end,
+ HeadersFun = fun (H) -> update_headers(UParams, Redelivered, H) end,
ForwardFun = fun (_H) -> true end,
Unacked1 = rabbit_federation_link_util:forward(
Upstream, DeliverMethod, Ch, DCh, PublishMethod,
@@ -253,3 +253,37 @@ go(S0 = #not_started{run = Run,
upstream_params = UParams,
unacked = Unacked}}
end, Upstream, UParams, QName, S0).
+
+update_headers(UParams, Redelivered, Headers) ->
+ {table, Info} = rabbit_federation_upstream:params_to_table(UParams),
+ {Headers1, Count} =
+ case rabbit_misc:table_lookup(Headers, ?ROUTING_HEADER) of
+ undefined ->
+ {Headers, 0};
+ {array, Been} ->
+ {Found, Been1} = lists:partition(
+ fun (I) -> visit_match(I, Info) end,
+ Been),
+ C = case Found of
+ [] -> 0;
+ [{table, T}] -> case rabbit_misc:table_lookup(
+ T, <<"visit-count">>) of
+ {_, I} when is_number(I) -> I;
+ _ -> 0
+ end
+ end,
+ {rabbit_misc:set_table_value(
+ Headers, ?ROUTING_HEADER, array, Been1), C}
+ end,
+ rabbit_basic:prepend_table_header(
+ ?ROUTING_HEADER, Info ++ [{<<"redelivered">>, bool, Redelivered},
+ {<<"visit-count">>, long, Count + 1}],
+ Headers1).
+
+visit_match({table, T}, Info) ->
+ lists:all(fun (K) ->
+ rabbit_misc:table_lookup(T, K) =:=
+ rabbit_misc:table_lookup(Info, K)
+ end, [<<"uri">>, <<"virtual_host">>, <<"queue">>]);
+visit_match(_ ,_) ->
+ false.
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
index 4dd3a63655..fe2d3d197b 100644
--- a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
+++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
@@ -46,10 +46,13 @@ for(XorQ, UpstreamName) ->
params_to_table(#upstream_params{uri = URI,
params = Params,
x_or_q = XorQ}) ->
+ Key = case XorQ of
+ #exchange{} -> <<"exchange">>;
+ #amqqueue{} -> <<"queue">>
+ end,
{table, [{<<"uri">>, longstr, remove_credentials(URI)},
{<<"virtual_host">>, longstr, vhost(Params)},
- %% TODO derp
- {<<"exchange">>, longstr, name(XorQ)}]}.
+ {Key, longstr, name(XorQ)}]}.
params_to_string(#upstream_params{uri = URI,
x_or_q = XorQ}) ->