summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-02-28 12:21:22 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-02-28 12:21:22 +0000
commit99c370c70001bee20f2cc5144321bac86e7924eb (patch)
treea2a41845121b754ac3ce931f64c3c35c9cde0edc
parentbbc9fcbcb631404e46259a606649a6bb5648db57 (diff)
parentfcb9a05d24be5a256de6539b0208371cf17aae8f (diff)
downloadrabbitmq-server-99c370c70001bee20f2cc5144321bac86e7924eb.tar.gz
Merging bug23483 to default
-rw-r--r--codegen.py2
-rw-r--r--include/rabbit.hrl5
-rw-r--r--src/rabbit.erl1
-rw-r--r--src/rabbit_basic.erl71
-rw-r--r--src/rabbit_channel.erl58
-rw-r--r--src/rabbit_exchange_type_direct.erl4
-rw-r--r--src/rabbit_exchange_type_fanout.erl2
-rw-r--r--src/rabbit_exchange_type_topic.erl8
-rw-r--r--src/rabbit_msg_file.erl67
-rw-r--r--src/rabbit_msg_store.erl58
-rw-r--r--src/rabbit_router.erl17
-rw-r--r--src/rabbit_tests.erl4
-rw-r--r--src/rabbit_types.erl2
-rw-r--r--src/rabbit_variable_queue.erl30
14 files changed, 226 insertions, 103 deletions
diff --git a/codegen.py b/codegen.py
index 1fd5bc69..8cd9dab8 100644
--- a/codegen.py
+++ b/codegen.py
@@ -324,7 +324,7 @@ def genErl(spec):
-type(amqp_field_type() ::
'longstr' | 'signedint' | 'decimal' | 'timestamp' |
'table' | 'byte' | 'double' | 'float' | 'long' |
- 'short' | 'bool' | 'binary' | 'void').
+ 'short' | 'bool' | 'binary' | 'void' | 'array').
-type(amqp_property_type() ::
'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' |
'longlongint' | 'timestamp' | 'bit' | 'table').
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index b9a01735..4d75b546 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -62,7 +62,7 @@
-record(listener, {node, protocol, host, ip_address, port}).
--record(basic_message, {exchange_name, routing_key, content, guid,
+-record(basic_message, {exchange_name, routing_keys = [], content, guid,
is_persistent}).
-record(ssl_socket, {tcp, ssl}).
@@ -87,6 +87,9 @@
-define(DESIRED_HIBERNATE, 10000).
-define(STATS_INTERVAL, 5000).
+-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
+-define(DELETED_HEADER, <<"BCC">>).
+
-ifdef(debug).
-define(LOGDEBUG0(F), rabbit_log:debug(F)).
-define(LOGDEBUG(F,A), rabbit_log:debug(F,A)).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index faf484af..6eb59c3e 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -38,6 +38,7 @@
-rabbit_boot_step({database,
[{mfa, {rabbit_mnesia, init, []}},
+ {requires, file_handle_cache},
{enables, external_infrastructure}]}).
-rabbit_boot_step({file_handle_cache,
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index c5bd9575..57aad808 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -18,10 +18,9 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/4, properties/1, delivery/5]).
+-export([publish/1, message/3, message/4, properties/1, delivery/5]).
-export([publish/4, publish/7]).
-export([build_content/2, from_content/1]).
--export([is_message_persistent/1]).
%%----------------------------------------------------------------------------
@@ -41,8 +40,11 @@
rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
- properties_input(), binary()) ->
- (rabbit_types:message() | rabbit_types:error(any()))).
+ properties_input(), binary()) -> rabbit_types:message()).
+-spec(message/3 ::
+ (rabbit_exchange:name(), rabbit_router:routing_key(),
+ rabbit_types:decoded_content()) ->
+ rabbit_types:ok_or_error2(rabbit_types:message(), any())).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
-spec(publish/4 ::
@@ -56,9 +58,6 @@
rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
{rabbit_framing:amqp_property_record(), binary()}).
--spec(is_message_persistent/1 :: (rabbit_types:decoded_content()) ->
- (boolean() |
- {'invalid', non_neg_integer()})).
-endif.
@@ -98,19 +97,40 @@ from_content(Content) ->
rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
{Props, list_to_binary(lists:reverse(FragmentsRev))}.
-message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
+%% This breaks the spec rule forbidding message modification
+strip_header(#content{properties = #'P_basic'{headers = undefined}}
+ = DecodedContent, _Key) ->
+ DecodedContent;
+strip_header(#content{properties = Props = #'P_basic'{headers = Headers}}
+ = DecodedContent, Key) ->
+ case lists:keysearch(Key, 1, Headers) of
+ false -> DecodedContent;
+ {value, Found} -> Headers0 = lists:delete(Found, Headers),
+ rabbit_binary_generator:clear_encoded_content(
+ DecodedContent#content{
+ properties = Props#'P_basic'{
+ headers = Headers0}})
+ end.
+
+message(ExchangeName, RoutingKey,
+ #content{properties = Props} = DecodedContent) ->
+ try
+ {ok, #basic_message{
+ exchange_name = ExchangeName,
+ content = strip_header(DecodedContent, ?DELETED_HEADER),
+ guid = rabbit_guid:guid(),
+ is_persistent = is_message_persistent(DecodedContent),
+ routing_keys = [RoutingKey |
+ header_routes(Props#'P_basic'.headers)]}}
+ catch
+ {error, _Reason} = Error -> Error
+ end.
+
+message(ExchangeName, RoutingKey, RawProperties, BodyBin) ->
Properties = properties(RawProperties),
Content = build_content(Properties, BodyBin),
- case is_message_persistent(Content) of
- {invalid, Other} ->
- {error, {invalid_delivery_mode, Other}};
- IsPersistent when is_boolean(IsPersistent) ->
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKeyBin,
- content = Content,
- guid = rabbit_guid:guid(),
- is_persistent = IsPersistent}
- end.
+ {ok, Msg} = message(ExchangeName, RoutingKey, Content),
+ Msg.
properties(P = #'P_basic'{}) ->
P;
@@ -152,5 +172,18 @@ is_message_persistent(#content{properties = #'P_basic'{
1 -> false;
2 -> true;
undefined -> false;
- Other -> {invalid, Other}
+ Other -> throw({error, {delivery_mode_unknown, Other}})
end.
+
+% Extract CC routes from headers
+header_routes(undefined) ->
+ [];
+header_routes(HeadersTable) ->
+ lists:append(
+ [case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of
+ {array, Routes} -> [Route || {longstr, Route} <- Routes];
+ undefined -> [];
+ {Type, _Val} -> throw({error, {unacceptable_type_in_header,
+ Type,
+ binary_to_list(HeaderKey)}})
+ end || HeaderKey <- ?ROUTING_HEADERS]).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7dc07e5a..e92421fc 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -254,7 +254,7 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
handle_cast({deliver, ConsumerTag, AckRequired,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
+ routing_keys = [RoutingKey | _CcRoutes],
content = Content}}},
State = #ch{writer_pid = WriterPid,
next_tag = DeliveryTag}) ->
@@ -593,32 +593,33 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
check_user_id_header(DecodedContent#content.properties, State),
- IsPersistent = is_message_persistent(DecodedContent),
{MsgSeqNo, State1} =
case ConfirmEnabled of
false -> {undefined, State};
true -> SeqNo = State#ch.publish_seqno,
{SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
- Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = DecodedContent,
- guid = rabbit_guid:guid(),
- is_persistent = IsPersistent},
- {RoutingRes, DeliveredQPids} =
- rabbit_exchange:publish(
- Exchange,
- rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
- MsgSeqNo)),
- State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName,
- MsgSeqNo, Message, State1),
- maybe_incr_stats([{ExchangeName, 1} |
- [{{QPid, ExchangeName}, 1} ||
- QPid <- DeliveredQPids]], publish, State2),
- {noreply, case TxnKey of
- none -> State2;
- _ -> add_tx_participants(DeliveredQPids, State2)
- end};
+ case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
+ {ok, Message} ->
+ {RoutingRes, DeliveredQPids} =
+ rabbit_exchange:publish(
+ Exchange,
+ rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
+ MsgSeqNo)),
+ State2 = process_routing_result(RoutingRes, DeliveredQPids,
+ ExchangeName, MsgSeqNo, Message,
+ State1),
+ maybe_incr_stats([{ExchangeName, 1} |
+ [{{QPid, ExchangeName}, 1} ||
+ QPid <- DeliveredQPids]], publish, State2),
+ {noreply, case TxnKey of
+ none -> State2;
+ _ -> add_tx_participants(DeliveredQPids, State2)
+ end};
+ {error, Reason} ->
+ rabbit_misc:protocol_error(precondition_failed,
+ "invalid message: ~p", [Reason])
+ end;
handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
multiple = Multiple,
@@ -658,7 +659,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
{ok, MessageCount,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
+ routing_keys = [RoutingKey | _CcRoutes],
content = Content}}} ->
State1 = lock_message(not(NoAck),
ack_record(DeliveryTag, none, Msg),
@@ -1123,7 +1124,7 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
end.
basic_return(#basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
+ routing_keys = [RoutingKey | _CcRoutes],
content = Content},
#ch{protocol = Protocol, writer_pid = WriterPid}, Reason) ->
{_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason),
@@ -1274,17 +1275,6 @@ notify_limiter(LimiterPid, Acked) ->
Count -> rabbit_limiter:ack(LimiterPid, Count)
end.
-is_message_persistent(Content) ->
- case rabbit_basic:is_message_persistent(Content) of
- {invalid, Other} ->
- rabbit_log:warning("Unknown delivery mode ~p - "
- "treating as 1, non-persistent~n",
- [Other]),
- false;
- IsPersistent when is_boolean(IsPersistent) ->
- IsPersistent
- end.
-
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index c51b0913..349c2f6e 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -36,8 +36,8 @@ description() ->
{description, <<"AMQP direct exchange, as per the AMQP specification">>}].
route(#exchange{name = Name},
- #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:match_routing_key(Name, RoutingKey).
+ #delivery{message = #basic_message{routing_keys = Routes}}) ->
+ rabbit_router:match_routing_key(Name, Routes).
validate(_X) -> ok.
create(_Tx, _X) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 382fb627..bc5293c8 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -36,7 +36,7 @@ description() ->
{description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
route(#exchange{name = Name}, _Delivery) ->
- rabbit_router:match_routing_key(Name, '_').
+ rabbit_router:match_routing_key(Name, ['_']).
validate(_X) -> ok.
create(_Tx, _X) -> ok.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index c1741b30..2363d05e 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -40,9 +40,11 @@ description() ->
%% NB: This may return duplicate results in some situations (that's ok)
route(#exchange{name = X},
- #delivery{message = #basic_message{routing_key = Key}}) ->
- Words = split_topic_key(Key),
- mnesia:async_dirty(fun trie_match/2, [X, Words]).
+ #delivery{message = #basic_message{routing_keys = Routes}}) ->
+ lists:append([begin
+ Words = split_topic_key(RKey),
+ mnesia:async_dirty(fun trie_match/2, [X, Words])
+ end || RKey <- Routes]).
validate(_X) -> ok.
create(_Tx, _X) -> ok.
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index cfea4982..55e6ac47 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -16,7 +16,7 @@
-module(rabbit_msg_file).
--export([append/3, read/2, scan/2]).
+-export([append/3, read/2, scan/4]).
%%----------------------------------------------------------------------------
@@ -45,9 +45,9 @@
-spec(read/2 :: (io_device(), msg_size()) ->
rabbit_types:ok_or_error2({rabbit_guid:guid(), msg()},
any())).
--spec(scan/2 :: (io_device(), file_size()) ->
- {'ok', [{rabbit_guid:guid(), msg_size(), position()}],
- position()}).
+-spec(scan/4 :: (io_device(), file_size(),
+ fun (({rabbit_guid:guid(), msg_size(), position(), binary()}, A) -> A),
+ A) -> {'ok', A, position()}).
-endif.
@@ -79,43 +79,44 @@ read(FileHdl, TotalSize) ->
KO -> KO
end.
-scan(FileHdl, FileSize) when FileSize >= 0 ->
- scan(FileHdl, FileSize, <<>>, 0, [], 0).
+scan(FileHdl, FileSize, Fun, Acc) when FileSize >= 0 ->
+ scan(FileHdl, FileSize, <<>>, 0, 0, Fun, Acc).
-scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset) ->
+scan(_FileHdl, FileSize, _Data, FileSize, ScanOffset, _Fun, Acc) ->
{ok, Acc, ScanOffset};
-scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset) ->
+scan(FileHdl, FileSize, Data, ReadOffset, ScanOffset, Fun, Acc) ->
Read = lists:min([?SCAN_BLOCK_SIZE, (FileSize - ReadOffset)]),
case file_handle_cache:read(FileHdl, Read) of
{ok, Data1} ->
{Data2, Acc1, ScanOffset1} =
- scan(<<Data/binary, Data1/binary>>, Acc, ScanOffset),
+ scanner(<<Data/binary, Data1/binary>>, ScanOffset, Fun, Acc),
ReadOffset1 = ReadOffset + size(Data1),
- scan(FileHdl, FileSize, Data2, ReadOffset1, Acc1, ScanOffset1);
+ scan(FileHdl, FileSize, Data2, ReadOffset1, ScanOffset1, Fun, Acc1);
_KO ->
{ok, Acc, ScanOffset}
end.
-scan(<<>>, Acc, Offset) ->
- {<<>>, Acc, Offset};
-scan(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) ->
- {<<>>, Acc, Offset}; %% Nothing to do other than stop.
-scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
- WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset) ->
- TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
- case WriteMarker of
- ?WRITE_OK_MARKER ->
- %% Here we take option 5 from
- %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in
- %% which we read the Guid as a number, and then convert it
- %% back to a binary in order to work around bugs in
- %% Erlang's GC.
- <<GuidNum:?GUID_SIZE_BITS, _Msg/binary>> =
- <<GuidAndMsg:Size/binary>>,
- <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>,
- scan(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize);
- _ ->
- scan(Rest, Acc, Offset + TotalSize)
- end;
-scan(Data, Acc, Offset) ->
- {Data, Acc, Offset}.
+scanner(<<>>, Offset, _Fun, Acc) ->
+ {<<>>, Acc, Offset};
+scanner(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Offset, _Fun, Acc) ->
+ {<<>>, Acc, Offset}; %% Nothing to do other than stop.
+scanner(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
+ WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Offset, Fun, Acc) ->
+ TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
+ case WriteMarker of
+ ?WRITE_OK_MARKER ->
+ %% Here we take option 5 from
+ %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in
+ %% which we read the Guid as a number, and then convert it
+ %% back to a binary in order to work around bugs in
+ %% Erlang's GC.
+ <<GuidNum:?GUID_SIZE_BITS, Msg/binary>> =
+ <<GuidAndMsg:Size/binary>>,
+ <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>,
+ scanner(Rest, Offset + TotalSize, Fun,
+ Fun({Guid, TotalSize, Offset, Msg}, Acc));
+ _ ->
+ scanner(Rest, Offset + TotalSize, Fun, Acc)
+ end;
+scanner(Data, Offset, _Fun, Acc) ->
+ {Data, Acc, Offset}.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 7f3cf35f..9e65e442 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -26,6 +26,8 @@
-export([sync/1, set_maximum_since_use/2,
has_readers/2, combine_files/3, delete_file/2]). %% internal
+-export([transform_dir/3, force_recovery/2]). %% upgrade
+
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]).
@@ -33,9 +35,10 @@
-include("rabbit_msg_store.hrl").
--define(SYNC_INTERVAL, 25). %% milliseconds
+-define(SYNC_INTERVAL, 5). %% milliseconds
-define(CLEAN_FILENAME, "clean.dot").
-define(FILE_SUMMARY_FILENAME, "file_summary.ets").
+-define(TRANSFORM_TMP, "transform_tmp").
-define(BINARY_MODE, [raw, binary]).
-define(READ_MODE, [read]).
@@ -160,6 +163,9 @@
-spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) ->
deletion_thunk()).
-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> deletion_thunk()).
+-spec(force_recovery/2 :: (file:filename(), server()) -> 'ok').
+-spec(transform_dir/3 :: (file:filename(), server(),
+ fun ((any()) -> (rabbit_types:ok_or_error2(msg(), any())))) -> 'ok').
-endif.
@@ -1523,7 +1529,8 @@ scan_file_for_valid_messages(Dir, FileName) ->
case open_file(Dir, FileName, ?READ_MODE) of
{ok, Hdl} -> Valid = rabbit_msg_file:scan(
Hdl, filelib:file_size(
- form_filename(Dir, FileName))),
+ form_filename(Dir, FileName)),
+ fun scan_fun/2, []),
%% if something really bad has happened,
%% the close could fail, but ignore
file_handle_cache:close(Hdl),
@@ -1532,6 +1539,9 @@ scan_file_for_valid_messages(Dir, FileName) ->
{error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
end.
+scan_fun({Guid, TotalSize, Offset, _Msg}, Acc) ->
+ [{Guid, TotalSize, Offset} | Acc].
+
%% Takes the list in *ascending* order (i.e. eldest message
%% first). This is the opposite of what scan_file_for_valid_messages
%% produces. The list of msgs that is produced is youngest first.
@@ -1956,3 +1966,47 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
{got, FinalOffsetZ},
{destination, Destination}]}
end.
+
+force_recovery(BaseDir, Store) ->
+ Dir = filename:join(BaseDir, atom_to_list(Store)),
+ file:delete(filename:join(Dir, ?CLEAN_FILENAME)),
+ recover_crashed_compactions(BaseDir),
+ ok.
+
+foreach_file(D, Fun, Files) ->
+ [Fun(filename:join(D, File)) || File <- Files].
+
+foreach_file(D1, D2, Fun, Files) ->
+ [Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files].
+
+transform_dir(BaseDir, Store, TransformFun) ->
+ Dir = filename:join(BaseDir, atom_to_list(Store)),
+ TmpDir = filename:join(Dir, ?TRANSFORM_TMP),
+ TransformFile = fun (A, B) -> transform_msg_file(A, B, TransformFun) end,
+ case filelib:is_dir(TmpDir) of
+ true -> throw({error, transform_failed_previously});
+ false -> FileList = list_sorted_file_names(Dir, ?FILE_EXTENSION),
+ foreach_file(Dir, TmpDir, TransformFile, FileList),
+ foreach_file(Dir, fun file:delete/1, FileList),
+ foreach_file(TmpDir, Dir, fun file:copy/2, FileList),
+ foreach_file(TmpDir, fun file:delete/1, FileList),
+ ok = file:del_dir(TmpDir)
+ end.
+
+transform_msg_file(FileOld, FileNew, TransformFun) ->
+ rabbit_misc:ensure_parent_dirs_exist(FileNew),
+ {ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []),
+ {ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write],
+ [{write_buffer,
+ ?HANDLE_CACHE_BUFFER_SIZE}]),
+ {ok, _Acc, _IgnoreSize} =
+ rabbit_msg_file:scan(
+ RefOld, filelib:file_size(FileOld),
+ fun({Guid, _Size, _Offset, BinMsg}, ok) ->
+ {ok, MsgNew} = TransformFun(binary_to_term(BinMsg)),
+ {ok, _} = rabbit_msg_file:append(RefNew, Guid, MsgNew),
+ ok
+ end, ok),
+ file_handle_cache:close(RefOld),
+ file_handle_cache:close(RefNew),
+ ok.
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 692d2473..53e707f4 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -37,7 +37,8 @@
fun ((rabbit_types:binding()) -> boolean())) ->
match_result()).
-spec(match_routing_key/2 :: (rabbit_types:binding_source(),
- routing_key() | '_') -> match_result()).
+ [routing_key()] | ['_']) ->
+ match_result()).
-endif.
@@ -82,12 +83,22 @@ match_bindings(SrcName, Match) ->
Match(Binding)]),
mnesia:async_dirty(fun qlc:e/1, [Query]).
-match_routing_key(SrcName, RoutingKey) ->
+match_routing_key(SrcName, [RoutingKey]) ->
MatchHead = #route{binding = #binding{source = SrcName,
destination = '$1',
key = RoutingKey,
_ = '_'}},
- mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]).
+ mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]);
+match_routing_key(SrcName, [_|_] = RoutingKeys) ->
+ Condition = list_to_tuple(['orelse' | [{'=:=', '$2', RKey} ||
+ RKey <- RoutingKeys]]),
+ MatchHead = #route{binding = #binding{source = SrcName,
+ destination = '$1',
+ key = '$2',
+ _ = '_'}},
+ mnesia:dirty_select(rabbit_route, [{MatchHead, [Condition], ['$1']}]).
+
+
%%--------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3515890e..0c6250df 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -701,8 +701,8 @@ test_topic_expect_match(X, List) ->
fun ({Key, Expected}) ->
BinKey = list_to_binary(Key),
Res = rabbit_exchange_type_topic:route(
- X, #delivery{message = #basic_message{routing_key =
- BinKey}}),
+ X, #delivery{message = #basic_message{routing_keys =
+ [BinKey]}}),
ExpectedRes = lists:map(
fun (Q) -> #resource{virtual_host = <<"/">>,
kind = queue,
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 3dbe740f..ab2300c0 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -64,7 +64,7 @@
-type(content() :: undecoded_content() | decoded_content()).
-type(basic_message() ::
#basic_message{exchange_name :: rabbit_exchange:name(),
- routing_key :: rabbit_router:routing_key(),
+ routing_keys :: [rabbit_router:routing_key()],
content :: content(),
guid :: rabbit_guid:guid(),
is_persistent :: boolean()}).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ea7caba8..d1307b85 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -22,7 +22,7 @@
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1]).
+ status/1, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -294,6 +294,8 @@
%%----------------------------------------------------------------------------
+-rabbit_upgrade({multiple_routing_keys, []}).
+
-ifdef(use_specs).
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
@@ -351,6 +353,8 @@
-include("rabbit_backing_queue_spec.hrl").
+-spec(multiple_routing_keys/0 :: () -> 'ok').
+
-endif.
-define(BLANK_DELTA, #delta { start_seq_id = undefined,
@@ -1801,3 +1805,27 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
push_betas_to_deltas(
Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1)
end.
+
+%%----------------------------------------------------------------------------
+%% Upgrading
+%%----------------------------------------------------------------------------
+
+multiple_routing_keys() ->
+ transform_storage(
+ fun ({basic_message, ExchangeName, Routing_Key, Content,
+ Guid, Persistent}) ->
+ {ok, {basic_message, ExchangeName, [Routing_Key], Content,
+ Guid, Persistent}};
+ (_) -> {error, corrupt_message}
+ end),
+ ok.
+
+
+%% Assumes message store is not running
+transform_storage(TransformFun) ->
+ transform_store(?PERSISTENT_MSG_STORE, TransformFun),
+ transform_store(?TRANSIENT_MSG_STORE, TransformFun).
+
+transform_store(Store, TransformFun) ->
+ rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
+ rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).