diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_basic.erl | 31 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 3 | ||||
-rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 2 | ||||
-rw-r--r-- | src/rabbit_msg_file.erl | 24 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 41 | ||||
-rw-r--r-- | src/rabbit_router.erl | 17 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 32 |
7 files changed, 77 insertions, 73 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 503f01bc..376a303e 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -31,7 +31,6 @@ -type(publish_result() :: ({ok, rabbit_router:routing_result(), [pid()]} | rabbit_types:error('not_found'))). --type(msg_or_error() :: {'ok', rabbit_types:message()} | {'error', any()}). -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). @@ -41,10 +40,11 @@ rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), - properties_input(), binary()) -> msg_or_error()). + properties_input(), binary()) -> rabbit_types:message()). -spec(message/3 :: (rabbit_exchange:name(), rabbit_router:routing_key(), - rabbit_types:decoded_content()) -> msg_or_error()). + rabbit_types:decoded_content()) -> {'ok', rabbit_types:message()} | + {'error', any()}). -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). -spec(publish/4 :: @@ -98,17 +98,19 @@ from_content(Content) -> {Props, list_to_binary(lists:reverse(FragmentsRev))}. %% This breaks the spec rule forbidding message modification +strip_header(#content{properties = #'P_basic'{headers = undefined}} + = DecodedContent, _Key) -> + DecodedContent; strip_header(#content{properties = Props = #'P_basic'{headers = Headers}} - = DecodedContent, Key) when Headers =/= undefined -> - case lists:keyfind(Key, 1, Headers) of - false -> DecodedContent; - Found -> Headers0 = lists:delete(Found, Headers), - rabbit_binary_generator:clear_encoded_content( - DecodedContent#content{ - properties = Props#'P_basic'{headers = Headers0}}) - end; -strip_header(DecodedContent, _Key) -> - DecodedContent. + = DecodedContent, Key) -> + case lists:keysearch(Key, 1, Headers) of + false -> DecodedContent; + {value, Found} -> Headers0 = lists:delete(Found, Headers), + rabbit_binary_generator:clear_encoded_content( + DecodedContent#content{ + properties = Props#'P_basic'{ + headers = Headers0}}) + end. message(ExchangeName, RoutingKey, #content{properties = Props} = DecodedContent) -> @@ -170,7 +172,7 @@ is_message_persistent(#content{properties = #'P_basic'{ 1 -> false; 2 -> true; undefined -> false; - _ -> false + Other -> throw({error, {delivery_mode_unknown, Other}}) end. % Extract CC routes from headers @@ -185,4 +187,3 @@ header_routes(HeadersTable) -> Type, binary_to_list(HeaderKey)}}) end || HeaderKey <- ?ROUTING_HEADERS]). - diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 82776c4a..349c2f6e 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -37,8 +37,7 @@ description() -> route(#exchange{name = Name}, #delivery{message = #basic_message{routing_keys = Routes}}) -> - lists:append([rabbit_router:match_routing_key(Name, RKey) || - RKey <- Routes]). + rabbit_router:match_routing_key(Name, Routes). validate(_X) -> ok. create(_Tx, _X) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 382fb627..bc5293c8 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -36,7 +36,7 @@ description() -> {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. route(#exchange{name = Name}, _Delivery) -> - rabbit_router:match_routing_key(Name, '_'). + rabbit_router:match_routing_key(Name, ['_']). validate(_X) -> ok. create(_Tx, _X) -> ok. diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index 81f2f07e..55e6ac47 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -80,28 +80,28 @@ read(FileHdl, TotalSize) -> end. scan(FileHdl, FileSize, Fun, Acc) when FileSize >= 0 -> - scan(FileHdl, FileSize, <<>>, 0, Acc, 0, Fun). + scan(FileHdl, FileSize, <<>>, 0, 0, Fun, Acc). -scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset, _Fun) -> +scan(_FileHdl, FileSize, _Data, FileSize, ScanOffset, _Fun, Acc) -> {ok, Acc, ScanOffset}; -scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset, Fun) -> +scan(FileHdl, FileSize, Data, ReadOffset, ScanOffset, Fun, Acc) -> Read = lists:min([?SCAN_BLOCK_SIZE, (FileSize - ReadOffset)]), case file_handle_cache:read(FileHdl, Read) of {ok, Data1} -> {Data2, Acc1, ScanOffset1} = - scanner(<<Data/binary, Data1/binary>>, Acc, ScanOffset, Fun), + scanner(<<Data/binary, Data1/binary>>, ScanOffset, Fun, Acc), ReadOffset1 = ReadOffset + size(Data1), - scan(FileHdl, FileSize, Data2, ReadOffset1, Acc1, ScanOffset1, Fun); + scan(FileHdl, FileSize, Data2, ReadOffset1, ScanOffset1, Fun, Acc1); _KO -> {ok, Acc, ScanOffset} end. -scanner(<<>>, Acc, Offset, _Fun) -> +scanner(<<>>, Offset, _Fun, Acc) -> {<<>>, Acc, Offset}; -scanner(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset, _Fun) -> +scanner(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Offset, _Fun, Acc) -> {<<>>, Acc, Offset}; %% Nothing to do other than stop. scanner(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, - WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset, Fun) -> + WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Offset, Fun, Acc) -> TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, case WriteMarker of ?WRITE_OK_MARKER -> @@ -113,10 +113,10 @@ scanner(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, <<GuidNum:?GUID_SIZE_BITS, Msg/binary>> = <<GuidAndMsg:Size/binary>>, <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>, - scanner(Rest, Fun({Guid, TotalSize, Offset, Msg}, Acc), - Offset + TotalSize, Fun); + scanner(Rest, Offset + TotalSize, Fun, + Fun({Guid, TotalSize, Offset, Msg}, Acc)); _ -> - scanner(Rest, Acc, Offset + TotalSize, Fun) + scanner(Rest, Offset + TotalSize, Fun, Acc) end; -scanner(Data, Acc, Offset, _Fun) -> +scanner(Data, Offset, _Fun, Acc) -> {Data, Acc, Offset}. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index a2f6d7e2..d798c4f7 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -26,7 +26,7 @@ -export([sync/1, set_maximum_since_use/2, has_readers/2, combine_files/3, delete_file/2]). %% internal --export([transform_dir/3, force_recovery/2]). %% upgrade +-export([multiple_routing_keys/0]). %% upgrade -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]). @@ -106,6 +106,8 @@ %%---------------------------------------------------------------------------- +-rabbit_upgrade({multiple_routing_keys, []}). + -ifdef(use_specs). -export_type([gc_state/0, file_num/0]). @@ -164,9 +166,7 @@ -spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) -> deletion_thunk()). -spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> deletion_thunk()). --spec(force_recovery/2 :: (file:filename(), server()) -> 'ok'). --spec(transform_dir/3 :: (file:filename(), server(), - fun ((binary()) -> ({'ok', msg()} | {error, any()}))) -> 'ok'). +-spec(multiple_routing_keys/0 :: () -> 'ok'). -endif. @@ -1968,6 +1968,25 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, {destination, Destination}]} end. +%%---------------------------------------------------------------------------- +%% upgrade +%%---------------------------------------------------------------------------- + +multiple_routing_keys() -> + [transform_store( + fun ({basic_message, ExchangeName, Routing_Key, Content, + Guid, Persistent}) -> + {ok, {basic_message, ExchangeName, [Routing_Key], Content, + Guid, Persistent}}; + (_) -> {error, corrupt_message} + end, Store) || Store <- rabbit_variable_queue:store_names()], + ok. + +%% Assumes message store is not running +transform_store(TransformFun, Store) -> + force_recovery(rabbit_mnesia:dir(), Store), + transform_dir(rabbit_mnesia:dir(), Store, TransformFun). + force_recovery(BaseDir, Store) -> Dir = filename:join(BaseDir, atom_to_list(Store)), file:delete(filename:join(Dir, ?CLEAN_FILENAME)), @@ -1975,10 +1994,10 @@ force_recovery(BaseDir, Store) -> File <- list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP)], ok. -for_each_file(D, Fun, Files) -> +foreach_file(D, Fun, Files) -> [Fun(filename:join(D, File)) || File <- Files]. -for_each_file(D1, D2, Fun, Files) -> +foreach_file(D1, D2, Fun, Files) -> [Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files]. transform_dir(BaseDir, Store, TransformFun) -> @@ -1988,11 +2007,11 @@ transform_dir(BaseDir, Store, TransformFun) -> case filelib:is_dir(TmpDir) of true -> throw({error, transform_failed_previously}); false -> OldFileList = list_sorted_file_names(Dir, ?FILE_EXTENSION), - for_each_file(Dir, TmpDir, TransformFile, OldFileList), - for_each_file(Dir, fun file:delete/1, OldFileList), + foreach_file(Dir, TmpDir, TransformFile, OldFileList), + foreach_file(Dir, fun file:delete/1, OldFileList), NewFileList = list_sorted_file_names(TmpDir, ?FILE_EXTENSION), - for_each_file(TmpDir, Dir, fun file:copy/2, NewFileList), - for_each_file(TmpDir, fun file:delete/1, NewFileList), + foreach_file(TmpDir, Dir, fun file:copy/2, NewFileList), + foreach_file(TmpDir, fun file:delete/1, NewFileList), ok = file:del_dir(TmpDir) end. @@ -2007,7 +2026,7 @@ transform_msg_file(FileOld, FileNew, TransformFun) -> rabbit_msg_file:scan( RefOld, Size, fun({Guid, _Size, _Offset, BinMsg}, ok) -> - case TransformFun(BinMsg) of + case TransformFun(binary_to_term(BinMsg)) of {ok, MsgNew} -> {ok, _} = rabbit_msg_file:append(RefNew, Guid, MsgNew), ok; diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 692d2473..53e707f4 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -37,7 +37,8 @@ fun ((rabbit_types:binding()) -> boolean())) -> match_result()). -spec(match_routing_key/2 :: (rabbit_types:binding_source(), - routing_key() | '_') -> match_result()). + [routing_key()] | ['_']) -> + match_result()). -endif. @@ -82,12 +83,22 @@ match_bindings(SrcName, Match) -> Match(Binding)]), mnesia:async_dirty(fun qlc:e/1, [Query]). -match_routing_key(SrcName, RoutingKey) -> +match_routing_key(SrcName, [RoutingKey]) -> MatchHead = #route{binding = #binding{source = SrcName, destination = '$1', key = RoutingKey, _ = '_'}}, - mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]). + mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]); +match_routing_key(SrcName, [_|_] = RoutingKeys) -> + Condition = list_to_tuple(['orelse' | [{'=:=', '$2', RKey} || + RKey <- RoutingKeys]]), + MatchHead = #route{binding = #binding{source = SrcName, + destination = '$1', + key = '$2', + _ = '_'}}, + mnesia:dirty_select(rabbit_route, [{MatchHead, [Condition], ['$1']}]). + + %%-------------------------------------------------------------------- diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index b0781f8f..4eb9c3b8 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -22,7 +22,7 @@ requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1, multiple_routing_keys/0]). + status/1, store_names/0]). -export([start/1, stop/0]). @@ -294,8 +294,6 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({multiple_routing_keys, []}). - -ifdef(use_specs). -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). @@ -1804,29 +1802,5 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1) end. -%%---------------------------------------------------------------------------- -%% Upgrading -%%---------------------------------------------------------------------------- - -multiple_routing_keys() -> - transform_storage( - fun (BinMsg) -> - case binary_to_term(BinMsg) of - {basic_message, ExchangeName, Routing_Key, Content, Guid, - Persistent} -> - {ok, {basic_message, ExchangeName, [Routing_Key], Content, - Guid, Persistent}}; - _ -> - {error, corrupt_message} - end - end), - ok. - -%% Assumes message store is not running -transform_storage(TransformFun) -> - transform_store(?PERSISTENT_MSG_STORE, TransformFun), - transform_store(?TRANSIENT_MSG_STORE, TransformFun). - -transform_store(Store, TransformFun) -> - rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store), - rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun). +store_names() -> + [?PERSISTENT_MSG_STORE, ?TRANSIENT_MSG_STORE]. |