summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_basic.erl31
-rw-r--r--src/rabbit_exchange_type_direct.erl3
-rw-r--r--src/rabbit_exchange_type_fanout.erl2
-rw-r--r--src/rabbit_msg_file.erl24
-rw-r--r--src/rabbit_msg_store.erl41
-rw-r--r--src/rabbit_router.erl17
-rw-r--r--src/rabbit_variable_queue.erl32
7 files changed, 77 insertions, 73 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 503f01bc..376a303e 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -31,7 +31,6 @@
-type(publish_result() ::
({ok, rabbit_router:routing_result(), [pid()]}
| rabbit_types:error('not_found'))).
--type(msg_or_error() :: {'ok', rabbit_types:message()} | {'error', any()}).
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
@@ -41,10 +40,11 @@
rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
- properties_input(), binary()) -> msg_or_error()).
+ properties_input(), binary()) -> rabbit_types:message()).
-spec(message/3 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
- rabbit_types:decoded_content()) -> msg_or_error()).
+ rabbit_types:decoded_content()) -> {'ok', rabbit_types:message()} |
+ {'error', any()}).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
-spec(publish/4 ::
@@ -98,17 +98,19 @@ from_content(Content) ->
{Props, list_to_binary(lists:reverse(FragmentsRev))}.
%% This breaks the spec rule forbidding message modification
+strip_header(#content{properties = #'P_basic'{headers = undefined}}
+ = DecodedContent, _Key) ->
+ DecodedContent;
strip_header(#content{properties = Props = #'P_basic'{headers = Headers}}
- = DecodedContent, Key) when Headers =/= undefined ->
- case lists:keyfind(Key, 1, Headers) of
- false -> DecodedContent;
- Found -> Headers0 = lists:delete(Found, Headers),
- rabbit_binary_generator:clear_encoded_content(
- DecodedContent#content{
- properties = Props#'P_basic'{headers = Headers0}})
- end;
-strip_header(DecodedContent, _Key) ->
- DecodedContent.
+ = DecodedContent, Key) ->
+ case lists:keysearch(Key, 1, Headers) of
+ false -> DecodedContent;
+ {value, Found} -> Headers0 = lists:delete(Found, Headers),
+ rabbit_binary_generator:clear_encoded_content(
+ DecodedContent#content{
+ properties = Props#'P_basic'{
+ headers = Headers0}})
+ end.
message(ExchangeName, RoutingKey,
#content{properties = Props} = DecodedContent) ->
@@ -170,7 +172,7 @@ is_message_persistent(#content{properties = #'P_basic'{
1 -> false;
2 -> true;
undefined -> false;
- _ -> false
+ Other -> throw({error, {delivery_mode_unknown, Other}})
end.
% Extract CC routes from headers
@@ -185,4 +187,3 @@ header_routes(HeadersTable) ->
Type,
binary_to_list(HeaderKey)}})
end || HeaderKey <- ?ROUTING_HEADERS]).
-
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 82776c4a..349c2f6e 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -37,8 +37,7 @@ description() ->
route(#exchange{name = Name},
#delivery{message = #basic_message{routing_keys = Routes}}) ->
- lists:append([rabbit_router:match_routing_key(Name, RKey) ||
- RKey <- Routes]).
+ rabbit_router:match_routing_key(Name, Routes).
validate(_X) -> ok.
create(_Tx, _X) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 382fb627..bc5293c8 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -36,7 +36,7 @@ description() ->
{description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
route(#exchange{name = Name}, _Delivery) ->
- rabbit_router:match_routing_key(Name, '_').
+ rabbit_router:match_routing_key(Name, ['_']).
validate(_X) -> ok.
create(_Tx, _X) -> ok.
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index 81f2f07e..55e6ac47 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -80,28 +80,28 @@ read(FileHdl, TotalSize) ->
end.
scan(FileHdl, FileSize, Fun, Acc) when FileSize >= 0 ->
- scan(FileHdl, FileSize, <<>>, 0, Acc, 0, Fun).
+ scan(FileHdl, FileSize, <<>>, 0, 0, Fun, Acc).
-scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset, _Fun) ->
+scan(_FileHdl, FileSize, _Data, FileSize, ScanOffset, _Fun, Acc) ->
{ok, Acc, ScanOffset};
-scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset, Fun) ->
+scan(FileHdl, FileSize, Data, ReadOffset, ScanOffset, Fun, Acc) ->
Read = lists:min([?SCAN_BLOCK_SIZE, (FileSize - ReadOffset)]),
case file_handle_cache:read(FileHdl, Read) of
{ok, Data1} ->
{Data2, Acc1, ScanOffset1} =
- scanner(<<Data/binary, Data1/binary>>, Acc, ScanOffset, Fun),
+ scanner(<<Data/binary, Data1/binary>>, ScanOffset, Fun, Acc),
ReadOffset1 = ReadOffset + size(Data1),
- scan(FileHdl, FileSize, Data2, ReadOffset1, Acc1, ScanOffset1, Fun);
+ scan(FileHdl, FileSize, Data2, ReadOffset1, ScanOffset1, Fun, Acc1);
_KO ->
{ok, Acc, ScanOffset}
end.
-scanner(<<>>, Acc, Offset, _Fun) ->
+scanner(<<>>, Offset, _Fun, Acc) ->
{<<>>, Acc, Offset};
-scanner(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset, _Fun) ->
+scanner(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Offset, _Fun, Acc) ->
{<<>>, Acc, Offset}; %% Nothing to do other than stop.
scanner(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
- WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset, Fun) ->
+ WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Offset, Fun, Acc) ->
TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
case WriteMarker of
?WRITE_OK_MARKER ->
@@ -113,10 +113,10 @@ scanner(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
<<GuidNum:?GUID_SIZE_BITS, Msg/binary>> =
<<GuidAndMsg:Size/binary>>,
<<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>,
- scanner(Rest, Fun({Guid, TotalSize, Offset, Msg}, Acc),
- Offset + TotalSize, Fun);
+ scanner(Rest, Offset + TotalSize, Fun,
+ Fun({Guid, TotalSize, Offset, Msg}, Acc));
_ ->
- scanner(Rest, Acc, Offset + TotalSize, Fun)
+ scanner(Rest, Offset + TotalSize, Fun, Acc)
end;
-scanner(Data, Acc, Offset, _Fun) ->
+scanner(Data, Offset, _Fun, Acc) ->
{Data, Acc, Offset}.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index a2f6d7e2..d798c4f7 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -26,7 +26,7 @@
-export([sync/1, set_maximum_since_use/2,
has_readers/2, combine_files/3, delete_file/2]). %% internal
--export([transform_dir/3, force_recovery/2]). %% upgrade
+-export([multiple_routing_keys/0]). %% upgrade
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]).
@@ -106,6 +106,8 @@
%%----------------------------------------------------------------------------
+-rabbit_upgrade({multiple_routing_keys, []}).
+
-ifdef(use_specs).
-export_type([gc_state/0, file_num/0]).
@@ -164,9 +166,7 @@
-spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) ->
deletion_thunk()).
-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> deletion_thunk()).
--spec(force_recovery/2 :: (file:filename(), server()) -> 'ok').
--spec(transform_dir/3 :: (file:filename(), server(),
- fun ((binary()) -> ({'ok', msg()} | {error, any()}))) -> 'ok').
+-spec(multiple_routing_keys/0 :: () -> 'ok').
-endif.
@@ -1968,6 +1968,25 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
{destination, Destination}]}
end.
+%%----------------------------------------------------------------------------
+%% upgrade
+%%----------------------------------------------------------------------------
+
+multiple_routing_keys() ->
+ [transform_store(
+ fun ({basic_message, ExchangeName, Routing_Key, Content,
+ Guid, Persistent}) ->
+ {ok, {basic_message, ExchangeName, [Routing_Key], Content,
+ Guid, Persistent}};
+ (_) -> {error, corrupt_message}
+ end, Store) || Store <- rabbit_variable_queue:store_names()],
+ ok.
+
+%% Assumes message store is not running
+transform_store(TransformFun, Store) ->
+ force_recovery(rabbit_mnesia:dir(), Store),
+ transform_dir(rabbit_mnesia:dir(), Store, TransformFun).
+
force_recovery(BaseDir, Store) ->
Dir = filename:join(BaseDir, atom_to_list(Store)),
file:delete(filename:join(Dir, ?CLEAN_FILENAME)),
@@ -1975,10 +1994,10 @@ force_recovery(BaseDir, Store) ->
File <- list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP)],
ok.
-for_each_file(D, Fun, Files) ->
+foreach_file(D, Fun, Files) ->
[Fun(filename:join(D, File)) || File <- Files].
-for_each_file(D1, D2, Fun, Files) ->
+foreach_file(D1, D2, Fun, Files) ->
[Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files].
transform_dir(BaseDir, Store, TransformFun) ->
@@ -1988,11 +2007,11 @@ transform_dir(BaseDir, Store, TransformFun) ->
case filelib:is_dir(TmpDir) of
true -> throw({error, transform_failed_previously});
false -> OldFileList = list_sorted_file_names(Dir, ?FILE_EXTENSION),
- for_each_file(Dir, TmpDir, TransformFile, OldFileList),
- for_each_file(Dir, fun file:delete/1, OldFileList),
+ foreach_file(Dir, TmpDir, TransformFile, OldFileList),
+ foreach_file(Dir, fun file:delete/1, OldFileList),
NewFileList = list_sorted_file_names(TmpDir, ?FILE_EXTENSION),
- for_each_file(TmpDir, Dir, fun file:copy/2, NewFileList),
- for_each_file(TmpDir, fun file:delete/1, NewFileList),
+ foreach_file(TmpDir, Dir, fun file:copy/2, NewFileList),
+ foreach_file(TmpDir, fun file:delete/1, NewFileList),
ok = file:del_dir(TmpDir)
end.
@@ -2007,7 +2026,7 @@ transform_msg_file(FileOld, FileNew, TransformFun) ->
rabbit_msg_file:scan(
RefOld, Size,
fun({Guid, _Size, _Offset, BinMsg}, ok) ->
- case TransformFun(BinMsg) of
+ case TransformFun(binary_to_term(BinMsg)) of
{ok, MsgNew} ->
{ok, _} = rabbit_msg_file:append(RefNew, Guid, MsgNew),
ok;
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 692d2473..53e707f4 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -37,7 +37,8 @@
fun ((rabbit_types:binding()) -> boolean())) ->
match_result()).
-spec(match_routing_key/2 :: (rabbit_types:binding_source(),
- routing_key() | '_') -> match_result()).
+ [routing_key()] | ['_']) ->
+ match_result()).
-endif.
@@ -82,12 +83,22 @@ match_bindings(SrcName, Match) ->
Match(Binding)]),
mnesia:async_dirty(fun qlc:e/1, [Query]).
-match_routing_key(SrcName, RoutingKey) ->
+match_routing_key(SrcName, [RoutingKey]) ->
MatchHead = #route{binding = #binding{source = SrcName,
destination = '$1',
key = RoutingKey,
_ = '_'}},
- mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]).
+ mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]);
+match_routing_key(SrcName, [_|_] = RoutingKeys) ->
+ Condition = list_to_tuple(['orelse' | [{'=:=', '$2', RKey} ||
+ RKey <- RoutingKeys]]),
+ MatchHead = #route{binding = #binding{source = SrcName,
+ destination = '$1',
+ key = '$2',
+ _ = '_'}},
+ mnesia:dirty_select(rabbit_route, [{MatchHead, [Condition], ['$1']}]).
+
+
%%--------------------------------------------------------------------
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b0781f8f..4eb9c3b8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -22,7 +22,7 @@
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1, multiple_routing_keys/0]).
+ status/1, store_names/0]).
-export([start/1, stop/0]).
@@ -294,8 +294,6 @@
%%----------------------------------------------------------------------------
--rabbit_upgrade({multiple_routing_keys, []}).
-
-ifdef(use_specs).
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
@@ -1804,29 +1802,5 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1)
end.
-%%----------------------------------------------------------------------------
-%% Upgrading
-%%----------------------------------------------------------------------------
-
-multiple_routing_keys() ->
- transform_storage(
- fun (BinMsg) ->
- case binary_to_term(BinMsg) of
- {basic_message, ExchangeName, Routing_Key, Content, Guid,
- Persistent} ->
- {ok, {basic_message, ExchangeName, [Routing_Key], Content,
- Guid, Persistent}};
- _ ->
- {error, corrupt_message}
- end
- end),
- ok.
-
-%% Assumes message store is not running
-transform_storage(TransformFun) ->
- transform_store(?PERSISTENT_MSG_STORE, TransformFun),
- transform_store(?TRANSIENT_MSG_STORE, TransformFun).
-
-transform_store(Store, TransformFun) ->
- rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
- rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).
+store_names() ->
+ [?PERSISTENT_MSG_STORE, ?TRANSIENT_MSG_STORE].