diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-10-29 17:31:42 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-10-29 17:31:42 +0000 |
commit | dfb74e689ea5150e310a78f95a470ea8e3cca310 (patch) | |
tree | 5848931503d422104a971c9133b31cad04942194 | |
parent | 2e97201381e9817035ad3025b717d120fc8d7c7e (diff) | |
parent | 463230c0a3cb2f6483e024bd06fb768320706ec4 (diff) | |
download | rabbitmq-server-dfb74e689ea5150e310a78f95a470ea8e3cca310.tar.gz |
Merge bug25244
-rw-r--r-- | include/rabbit.hrl | 1 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 48 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 74 |
4 files changed, 116 insertions, 11 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index adbb6102..b2832b45 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -110,5 +110,6 @@ -define(DESIRED_HIBERNATE, 10000). -define(CREDIT_DISC_BOUND, {2000, 500}). +-define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>). -define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). -define(DELETED_HEADER, <<"BCC">>). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8d05a78c..ec5ef3f8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -853,8 +853,8 @@ make_dead_letter_msg(Reason, {<<"time">>, timestamp, TimeSec}, {<<"exchange">>, longstr, Exchange#resource.name}, {<<"routing-keys">>, array, RKs1}], - HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>, - Info, Headers)) + HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>, + Info, Headers)) end, Content1 = rabbit_basic:map_headers(HeadersFun2, Content), Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(), diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index db2b7e95..9966c0df 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -19,7 +19,7 @@ -include("rabbit_framing.hrl"). -export([publish/4, publish/5, publish/1, - message/3, message/4, properties/1, append_table_header/3, + message/3, message/4, properties/1, prepend_table_header/3, extract_headers/1, map_headers/2, delivery/3, header_routes/1]). -export([build_content/2, from_content/1]). @@ -58,7 +58,7 @@ -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). --spec(append_table_header/3 :: +-spec(prepend_table_header/3 :: (binary(), rabbit_framing:amqp_table(), headers()) -> headers()). -spec(extract_headers/1 :: (rabbit_types:content()) -> headers()). @@ -177,15 +177,45 @@ properties(P) when is_list(P) -> end end, #'P_basic'{}, P). -append_table_header(Name, Info, undefined) -> - append_table_header(Name, Info, []); -append_table_header(Name, Info, Headers) -> - Prior = case rabbit_misc:table_lookup(Headers, Name) of - undefined -> []; - {array, Existing} -> Existing - end, +prepend_table_header(Name, Info, undefined) -> + prepend_table_header(Name, Info, []); +prepend_table_header(Name, Info, Headers) -> + case rabbit_misc:table_lookup(Headers, Name) of + {array, Existing} -> + prepend_table(Name, Info, Existing, Headers); + undefined -> + prepend_table(Name, Info, [], Headers); + Other -> + Headers2 = prepend_table(Name, Info, [], Headers), + set_invalid_header(Name, Other, Headers2) + end. + +prepend_table(Name, Info, Prior, Headers) -> rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]). +set_invalid_header(Name, {_, _}=Value, Headers) when is_list(Headers) -> + case rabbit_misc:table_lookup(Headers, ?INVALID_HEADERS_KEY) of + undefined -> + set_invalid([{Name, array, [Value]}], Headers); + {table, ExistingHdr} -> + update_invalid(Name, Value, ExistingHdr, Headers); + Other -> + %% somehow the x-invalid-headers header is corrupt + Invalid = [{?INVALID_HEADERS_KEY, array, [Other]}], + set_invalid_header(Name, Value, set_invalid(Invalid, Headers)) + end. + +set_invalid(NewHdr, Headers) -> + rabbit_misc:set_table_value(Headers, ?INVALID_HEADERS_KEY, table, NewHdr). + +update_invalid(Name, Value, ExistingHdr, Header) -> + Values = case rabbit_misc:table_lookup(ExistingHdr, Name) of + undefined -> [Value]; + {array, Prior} -> [Value | Prior] + end, + NewHdr = rabbit_misc:set_table_value(ExistingHdr, Name, array, Values), + set_invalid(NewHdr, Header). + extract_headers(Content) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index afc10b9f..51ca6043 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -41,6 +41,7 @@ all_tests() -> passed = test_multi_call(), passed = test_file_handle_cache(), passed = test_backing_queue(), + passed = test_rabbit_basic_header_handling(), passed = test_priority_queue(), passed = test_pg_local(), passed = test_unfold(), @@ -71,6 +72,7 @@ all_tests() -> passed = test_configurable_server_properties(), passed. + do_if_secondary_node(Up, Down) -> SecondaryNode = rabbit_nodes:make("hare"), @@ -159,6 +161,78 @@ test_multi_call() -> exit(Pid3, bang), passed. +test_rabbit_basic_header_handling() -> + passed = write_table_with_invalid_existing_type_test(), + passed = invalid_existing_headers_test(), + passed = disparate_invalid_header_entries_accumulate_separately_test(), + passed = corrupt_or_invalid_headers_are_overwritten_test(), + passed = invalid_same_header_entry_accumulation_test(), + passed. + +-define(XDEATH_TABLE, + [{<<"reason">>, longstr, <<"blah">>}, + {<<"queue">>, longstr, <<"foo.bar.baz">>}, + {<<"exchange">>, longstr, <<"my-exchange">>}, + {<<"routing-keys">>, array, []}]). + +-define(ROUTE_TABLE, [{<<"redelivered">>, bool, <<"true">>}]). + +-define(BAD_HEADER(K), {<<K>>, longstr, <<"bad ", K>>}). +-define(BAD_HEADER(K, Suf), {<<K>>, longstr, <<"bad ", K, Suf>>}). +-define(FOUND_BAD_HEADER(K), {<<K>>, array, [{longstr, <<"bad ", K>>}]}). + +write_table_with_invalid_existing_type_test() -> + prepend_check(<<"header1">>, ?XDEATH_TABLE, [?BAD_HEADER("header1")]), + passed. + +invalid_existing_headers_test() -> + Headers = + prepend_check(<<"header2">>, ?ROUTE_TABLE, [?BAD_HEADER("header2")]), + {array, [{table, ?ROUTE_TABLE}]} = + rabbit_misc:table_lookup(Headers, <<"header2">>), + passed. + +disparate_invalid_header_entries_accumulate_separately_test() -> + BadHeaders = [?BAD_HEADER("header2")], + Headers = prepend_check(<<"header2">>, ?ROUTE_TABLE, BadHeaders), + Headers2 = prepend_check(<<"header1">>, ?XDEATH_TABLE, + [?BAD_HEADER("header1") | Headers]), + {table, [?FOUND_BAD_HEADER("header1"), + ?FOUND_BAD_HEADER("header2")]} = + rabbit_misc:table_lookup(Headers2, ?INVALID_HEADERS_KEY), + passed. + +corrupt_or_invalid_headers_are_overwritten_test() -> + Headers0 = [?BAD_HEADER("header1"), + ?BAD_HEADER("x-invalid-headers")], + Headers1 = prepend_check(<<"header1">>, ?XDEATH_TABLE, Headers0), + {table,[?FOUND_BAD_HEADER("header1"), + ?FOUND_BAD_HEADER("x-invalid-headers")]} = + rabbit_misc:table_lookup(Headers1, ?INVALID_HEADERS_KEY), + passed. + +invalid_same_header_entry_accumulation_test() -> + BadHeader1 = ?BAD_HEADER("header1", "a"), + Headers = prepend_check(<<"header1">>, ?ROUTE_TABLE, [BadHeader1]), + Headers2 = prepend_check(<<"header1">>, ?ROUTE_TABLE, + [?BAD_HEADER("header1", "b") | Headers]), + {table, InvalidHeaders} = + rabbit_misc:table_lookup(Headers2, ?INVALID_HEADERS_KEY), + {array, [{longstr,<<"bad header1b">>}, + {longstr,<<"bad header1a">>}]} = + rabbit_misc:table_lookup(InvalidHeaders, <<"header1">>), + passed. + +prepend_check(HeaderKey, HeaderTable, Headers) -> + Headers1 = rabbit_basic:prepend_table_header( + HeaderKey, HeaderTable, Headers), + {table, Invalid} = + rabbit_misc:table_lookup(Headers1, ?INVALID_HEADERS_KEY), + {Type, Value} = rabbit_misc:table_lookup(Headers, HeaderKey), + {array, [{Type, Value} | _]} = + rabbit_misc:table_lookup(Invalid, HeaderKey), + Headers1. + test_priority_queue() -> false = priority_queue:is_queue(not_a_queue), |