summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-02-21 17:40:17 +0000
committerEmile Joubert <emile@rabbitmq.com>2011-02-21 17:40:17 +0000
commitae7bd1fae91979e83e462bb1d0fec41f0b65bceb (patch)
tree51a48f540104085983ead15e5995bfdb60112078
parent8ebfa5137cb826a70afaa602653b909ded0a0052 (diff)
parent46b79df30bb8eac1905dfd2cc73e760f753b99f6 (diff)
downloadrabbitmq-server-ae7bd1fae91979e83e462bb1d0fec41f0b65bceb.tar.gz
Merged default into bug23483
-rw-r--r--codegen.py2
-rw-r--r--include/rabbit.hrl5
-rw-r--r--include/rabbit_backing_queue_spec.hrl1
-rw-r--r--src/rabbit.erl1
-rw-r--r--src/rabbit_basic.erl70
-rw-r--r--src/rabbit_channel.erl58
-rw-r--r--src/rabbit_exchange_type_direct.erl5
-rw-r--r--src/rabbit_exchange_type_topic.erl8
-rw-r--r--src/rabbit_msg_file.erl67
-rw-r--r--src/rabbit_msg_store.erl67
-rw-r--r--src/rabbit_tests.erl4
-rw-r--r--src/rabbit_types.erl2
-rw-r--r--src/rabbit_variable_queue.erl31
13 files changed, 223 insertions, 98 deletions
diff --git a/codegen.py b/codegen.py
index 1fd5bc69..8cd9dab8 100644
--- a/codegen.py
+++ b/codegen.py
@@ -324,7 +324,7 @@ def genErl(spec):
-type(amqp_field_type() ::
'longstr' | 'signedint' | 'decimal' | 'timestamp' |
'table' | 'byte' | 'double' | 'float' | 'long' |
- 'short' | 'bool' | 'binary' | 'void').
+ 'short' | 'bool' | 'binary' | 'void' | 'array').
-type(amqp_property_type() ::
'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' |
'longlongint' | 'timestamp' | 'bit' | 'table').
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index b9a01735..4d75b546 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -62,7 +62,7 @@
-record(listener, {node, protocol, host, ip_address, port}).
--record(basic_message, {exchange_name, routing_key, content, guid,
+-record(basic_message, {exchange_name, routing_keys = [], content, guid,
is_persistent}).
-record(ssl_socket, {tcp, ssl}).
@@ -87,6 +87,9 @@
-define(DESIRED_HIBERNATE, 10000).
-define(STATS_INTERVAL, 5000).
+-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
+-define(DELETED_HEADER, <<"BCC">>).
+
-ifdef(debug).
-define(LOGDEBUG0(F), rabbit_log:debug(F)).
-define(LOGDEBUG(F,A), rabbit_log:debug(F,A)).
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index accb2c0e..17cdedc2 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -65,3 +65,4 @@
-spec(idle_timeout/1 :: (state()) -> state()).
-spec(handle_pre_hibernate/1 :: (state()) -> state()).
-spec(status/1 :: (state()) -> [{atom(), any()}]).
+-spec(multiple_routing_keys/0 :: () -> 'ok').
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 1beed5c1..1174cec6 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -38,6 +38,7 @@
-rabbit_boot_step({database,
[{mfa, {rabbit_mnesia, init, []}},
+ {requires, file_handle_cache},
{enables, external_infrastructure}]}).
-rabbit_boot_step({file_handle_cache,
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index c5bd9575..503f01bc 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -18,10 +18,9 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/4, properties/1, delivery/5]).
+-export([publish/1, message/3, message/4, properties/1, delivery/5]).
-export([publish/4, publish/7]).
-export([build_content/2, from_content/1]).
--export([is_message_persistent/1]).
%%----------------------------------------------------------------------------
@@ -32,6 +31,7 @@
-type(publish_result() ::
({ok, rabbit_router:routing_result(), [pid()]}
| rabbit_types:error('not_found'))).
+-type(msg_or_error() :: {'ok', rabbit_types:message()} | {'error', any()}).
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
@@ -41,8 +41,10 @@
rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
- properties_input(), binary()) ->
- (rabbit_types:message() | rabbit_types:error(any()))).
+ properties_input(), binary()) -> msg_or_error()).
+-spec(message/3 ::
+ (rabbit_exchange:name(), rabbit_router:routing_key(),
+ rabbit_types:decoded_content()) -> msg_or_error()).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
-spec(publish/4 ::
@@ -56,9 +58,6 @@
rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
{rabbit_framing:amqp_property_record(), binary()}).
--spec(is_message_persistent/1 :: (rabbit_types:decoded_content()) ->
- (boolean() |
- {'invalid', non_neg_integer()})).
-endif.
@@ -98,19 +97,38 @@ from_content(Content) ->
rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
{Props, list_to_binary(lists:reverse(FragmentsRev))}.
-message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
+%% This breaks the spec rule forbidding message modification
+strip_header(#content{properties = Props = #'P_basic'{headers = Headers}}
+ = DecodedContent, Key) when Headers =/= undefined ->
+ case lists:keyfind(Key, 1, Headers) of
+ false -> DecodedContent;
+ Found -> Headers0 = lists:delete(Found, Headers),
+ rabbit_binary_generator:clear_encoded_content(
+ DecodedContent#content{
+ properties = Props#'P_basic'{headers = Headers0}})
+ end;
+strip_header(DecodedContent, _Key) ->
+ DecodedContent.
+
+message(ExchangeName, RoutingKey,
+ #content{properties = Props} = DecodedContent) ->
+ try
+ {ok, #basic_message{
+ exchange_name = ExchangeName,
+ content = strip_header(DecodedContent, ?DELETED_HEADER),
+ guid = rabbit_guid:guid(),
+ is_persistent = is_message_persistent(DecodedContent),
+ routing_keys = [RoutingKey |
+ header_routes(Props#'P_basic'.headers)]}}
+ catch
+ {error, _Reason} = Error -> Error
+ end.
+
+message(ExchangeName, RoutingKey, RawProperties, BodyBin) ->
Properties = properties(RawProperties),
Content = build_content(Properties, BodyBin),
- case is_message_persistent(Content) of
- {invalid, Other} ->
- {error, {invalid_delivery_mode, Other}};
- IsPersistent when is_boolean(IsPersistent) ->
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKeyBin,
- content = Content,
- guid = rabbit_guid:guid(),
- is_persistent = IsPersistent}
- end.
+ {ok, Msg} = message(ExchangeName, RoutingKey, Content),
+ Msg.
properties(P = #'P_basic'{}) ->
P;
@@ -152,5 +170,19 @@ is_message_persistent(#content{properties = #'P_basic'{
1 -> false;
2 -> true;
undefined -> false;
- Other -> {invalid, Other}
+ _ -> false
end.
+
+% Extract CC routes from headers
+header_routes(undefined) ->
+ [];
+header_routes(HeadersTable) ->
+ lists:append(
+ [case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of
+ {array, Routes} -> [Route || {longstr, Route} <- Routes];
+ undefined -> [];
+ {Type, _Val} -> throw({error, {unacceptable_type_in_header,
+ Type,
+ binary_to_list(HeaderKey)}})
+ end || HeaderKey <- ?ROUTING_HEADERS]).
+
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 34a5e5a4..51250d4b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -254,7 +254,7 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
handle_cast({deliver, ConsumerTag, AckRequired,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
+ routing_keys = [RoutingKey | _CcRoutes],
content = Content}}},
State = #ch{writer_pid = WriterPid,
next_tag = DeliveryTag}) ->
@@ -593,32 +593,33 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
check_user_id_header(DecodedContent#content.properties, State),
- IsPersistent = is_message_persistent(DecodedContent),
{MsgSeqNo, State1} =
case ConfirmEnabled of
false -> {undefined, State};
true -> SeqNo = State#ch.publish_seqno,
{SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
- Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = DecodedContent,
- guid = rabbit_guid:guid(),
- is_persistent = IsPersistent},
- {RoutingRes, DeliveredQPids} =
- rabbit_exchange:publish(
- Exchange,
- rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
- MsgSeqNo)),
- State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName,
- MsgSeqNo, Message, State1),
- maybe_incr_stats([{ExchangeName, 1} |
- [{{QPid, ExchangeName}, 1} ||
- QPid <- DeliveredQPids]], publish, State2),
- {noreply, case TxnKey of
- none -> State2;
- _ -> add_tx_participants(DeliveredQPids, State2)
- end};
+ case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
+ {ok, Message} ->
+ {RoutingRes, DeliveredQPids} =
+ rabbit_exchange:publish(
+ Exchange,
+ rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
+ MsgSeqNo)),
+ State2 = process_routing_result(RoutingRes, DeliveredQPids,
+ ExchangeName, MsgSeqNo, Message,
+ State1),
+ maybe_incr_stats([{ExchangeName, 1} |
+ [{{QPid, ExchangeName}, 1} ||
+ QPid <- DeliveredQPids]], publish, State2),
+ {noreply, case TxnKey of
+ none -> State2;
+ _ -> add_tx_participants(DeliveredQPids, State2)
+ end};
+ {error, Reason} ->
+ rabbit_misc:protocol_error(precondition_failed,
+ "invalid message: ~p", [Reason])
+ end;
handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
multiple = Multiple,
@@ -658,7 +659,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
{ok, MessageCount,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
+ routing_keys = [RoutingKey | _CcRoutes],
content = Content}}} ->
State1 = lock_message(not(NoAck),
ack_record(DeliveryTag, none, Msg),
@@ -1123,7 +1124,7 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
end.
basic_return(#basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
+ routing_keys = [RoutingKey | _CcRoutes],
content = Content},
#ch{protocol = Protocol, writer_pid = WriterPid}, Reason) ->
{_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason),
@@ -1274,17 +1275,6 @@ notify_limiter(LimiterPid, Acked) ->
Count -> rabbit_limiter:ack(LimiterPid, Count)
end.
-is_message_persistent(Content) ->
- case rabbit_basic:is_message_persistent(Content) of
- {invalid, Other} ->
- rabbit_log:warning("Unknown delivery mode ~p - "
- "treating as 1, non-persistent~n",
- [Other]),
- false;
- IsPersistent when is_boolean(IsPersistent) ->
- IsPersistent
- end.
-
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
record_confirm(MsgSeqNo, XName, State);
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index c51b0913..82776c4a 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -36,8 +36,9 @@ description() ->
{description, <<"AMQP direct exchange, as per the AMQP specification">>}].
route(#exchange{name = Name},
- #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:match_routing_key(Name, RoutingKey).
+ #delivery{message = #basic_message{routing_keys = Routes}}) ->
+ lists:append([rabbit_router:match_routing_key(Name, RKey) ||
+ RKey <- Routes]).
validate(_X) -> ok.
create(_Tx, _X) -> ok.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index c1741b30..2363d05e 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -40,9 +40,11 @@ description() ->
%% NB: This may return duplicate results in some situations (that's ok)
route(#exchange{name = X},
- #delivery{message = #basic_message{routing_key = Key}}) ->
- Words = split_topic_key(Key),
- mnesia:async_dirty(fun trie_match/2, [X, Words]).
+ #delivery{message = #basic_message{routing_keys = Routes}}) ->
+ lists:append([begin
+ Words = split_topic_key(RKey),
+ mnesia:async_dirty(fun trie_match/2, [X, Words])
+ end || RKey <- Routes]).
validate(_X) -> ok.
create(_Tx, _X) -> ok.
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index cfea4982..81f2f07e 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -16,7 +16,7 @@
-module(rabbit_msg_file).
--export([append/3, read/2, scan/2]).
+-export([append/3, read/2, scan/4]).
%%----------------------------------------------------------------------------
@@ -45,9 +45,9 @@
-spec(read/2 :: (io_device(), msg_size()) ->
rabbit_types:ok_or_error2({rabbit_guid:guid(), msg()},
any())).
--spec(scan/2 :: (io_device(), file_size()) ->
- {'ok', [{rabbit_guid:guid(), msg_size(), position()}],
- position()}).
+-spec(scan/4 :: (io_device(), file_size(),
+ fun (({rabbit_guid:guid(), msg_size(), position(), binary()}, A) -> A),
+ A) -> {'ok', A, position()}).
-endif.
@@ -79,43 +79,44 @@ read(FileHdl, TotalSize) ->
KO -> KO
end.
-scan(FileHdl, FileSize) when FileSize >= 0 ->
- scan(FileHdl, FileSize, <<>>, 0, [], 0).
+scan(FileHdl, FileSize, Fun, Acc) when FileSize >= 0 ->
+ scan(FileHdl, FileSize, <<>>, 0, Acc, 0, Fun).
-scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset) ->
+scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset, _Fun) ->
{ok, Acc, ScanOffset};
-scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset) ->
+scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset, Fun) ->
Read = lists:min([?SCAN_BLOCK_SIZE, (FileSize - ReadOffset)]),
case file_handle_cache:read(FileHdl, Read) of
{ok, Data1} ->
{Data2, Acc1, ScanOffset1} =
- scan(<<Data/binary, Data1/binary>>, Acc, ScanOffset),
+ scanner(<<Data/binary, Data1/binary>>, Acc, ScanOffset, Fun),
ReadOffset1 = ReadOffset + size(Data1),
- scan(FileHdl, FileSize, Data2, ReadOffset1, Acc1, ScanOffset1);
+ scan(FileHdl, FileSize, Data2, ReadOffset1, Acc1, ScanOffset1, Fun);
_KO ->
{ok, Acc, ScanOffset}
end.
-scan(<<>>, Acc, Offset) ->
- {<<>>, Acc, Offset};
-scan(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) ->
- {<<>>, Acc, Offset}; %% Nothing to do other than stop.
-scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
- WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset) ->
- TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
- case WriteMarker of
- ?WRITE_OK_MARKER ->
- %% Here we take option 5 from
- %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in
- %% which we read the Guid as a number, and then convert it
- %% back to a binary in order to work around bugs in
- %% Erlang's GC.
- <<GuidNum:?GUID_SIZE_BITS, _Msg/binary>> =
- <<GuidAndMsg:Size/binary>>,
- <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>,
- scan(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize);
- _ ->
- scan(Rest, Acc, Offset + TotalSize)
- end;
-scan(Data, Acc, Offset) ->
- {Data, Acc, Offset}.
+scanner(<<>>, Acc, Offset, _Fun) ->
+ {<<>>, Acc, Offset};
+scanner(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset, _Fun) ->
+ {<<>>, Acc, Offset}; %% Nothing to do other than stop.
+scanner(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
+ WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset, Fun) ->
+ TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
+ case WriteMarker of
+ ?WRITE_OK_MARKER ->
+ %% Here we take option 5 from
+ %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in
+ %% which we read the Guid as a number, and then convert it
+ %% back to a binary in order to work around bugs in
+ %% Erlang's GC.
+ <<GuidNum:?GUID_SIZE_BITS, Msg/binary>> =
+ <<GuidAndMsg:Size/binary>>,
+ <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>,
+ scanner(Rest, Fun({Guid, TotalSize, Offset, Msg}, Acc),
+ Offset + TotalSize, Fun);
+ _ ->
+ scanner(Rest, Acc, Offset + TotalSize, Fun)
+ end;
+scanner(Data, Acc, Offset, _Fun) ->
+ {Data, Acc, Offset}.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 7f3cf35f..a2f6d7e2 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -26,16 +26,20 @@
-export([sync/1, set_maximum_since_use/2,
has_readers/2, combine_files/3, delete_file/2]). %% internal
+-export([transform_dir/3, force_recovery/2]). %% upgrade
+
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]).
%%----------------------------------------------------------------------------
-include("rabbit_msg_store.hrl").
+-include_lib("kernel/include/file.hrl").
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(CLEAN_FILENAME, "clean.dot").
-define(FILE_SUMMARY_FILENAME, "file_summary.ets").
+-define(TRANSFORM_TMP, "transform_tmp").
-define(BINARY_MODE, [raw, binary]).
-define(READ_MODE, [read]).
@@ -160,6 +164,9 @@
-spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) ->
deletion_thunk()).
-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> deletion_thunk()).
+-spec(force_recovery/2 :: (file:filename(), server()) -> 'ok').
+-spec(transform_dir/3 :: (file:filename(), server(),
+ fun ((binary()) -> ({'ok', msg()} | {error, any()}))) -> 'ok').
-endif.
@@ -1523,7 +1530,8 @@ scan_file_for_valid_messages(Dir, FileName) ->
case open_file(Dir, FileName, ?READ_MODE) of
{ok, Hdl} -> Valid = rabbit_msg_file:scan(
Hdl, filelib:file_size(
- form_filename(Dir, FileName))),
+ form_filename(Dir, FileName)),
+ fun scan_fun/2, []),
%% if something really bad has happened,
%% the close could fail, but ignore
file_handle_cache:close(Hdl),
@@ -1532,6 +1540,9 @@ scan_file_for_valid_messages(Dir, FileName) ->
{error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
end.
+scan_fun({Guid, TotalSize, Offset, _Msg}, Acc) ->
+ [{Guid, TotalSize, Offset} | Acc].
+
%% Takes the list in *ascending* order (i.e. eldest message
%% first). This is the opposite of what scan_file_for_valid_messages
%% produces. The list of msgs that is produced is youngest first.
@@ -1956,3 +1967,57 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
{got, FinalOffsetZ},
{destination, Destination}]}
end.
+
+force_recovery(BaseDir, Store) ->
+ Dir = filename:join(BaseDir, atom_to_list(Store)),
+ file:delete(filename:join(Dir, ?CLEAN_FILENAME)),
+ [file:delete(filename:join(Dir, File)) ||
+ File <- list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP)],
+ ok.
+
+for_each_file(D, Fun, Files) ->
+ [Fun(filename:join(D, File)) || File <- Files].
+
+for_each_file(D1, D2, Fun, Files) ->
+ [Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files].
+
+transform_dir(BaseDir, Store, TransformFun) ->
+ Dir = filename:join(BaseDir, atom_to_list(Store)),
+ TmpDir = filename:join(Dir, ?TRANSFORM_TMP),
+ TransformFile = fun (A, B) -> transform_msg_file(A, B, TransformFun) end,
+ case filelib:is_dir(TmpDir) of
+ true -> throw({error, transform_failed_previously});
+ false -> OldFileList = list_sorted_file_names(Dir, ?FILE_EXTENSION),
+ for_each_file(Dir, TmpDir, TransformFile, OldFileList),
+ for_each_file(Dir, fun file:delete/1, OldFileList),
+ NewFileList = list_sorted_file_names(TmpDir, ?FILE_EXTENSION),
+ for_each_file(TmpDir, Dir, fun file:copy/2, NewFileList),
+ for_each_file(TmpDir, fun file:delete/1, NewFileList),
+ ok = file:del_dir(TmpDir)
+ end.
+
+transform_msg_file(FileOld, FileNew, TransformFun) ->
+ rabbit_misc:ensure_parent_dirs_exist(FileNew),
+ {ok, #file_info{size=Size}} = file:read_file_info(FileOld),
+ {ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []),
+ {ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write],
+ [{write_buffer,
+ ?HANDLE_CACHE_BUFFER_SIZE}]),
+ {ok, _Acc, _IgnoreSize} =
+ rabbit_msg_file:scan(
+ RefOld, Size,
+ fun({Guid, _Size, _Offset, BinMsg}, ok) ->
+ case TransformFun(BinMsg) of
+ {ok, MsgNew} ->
+ {ok, _} = rabbit_msg_file:append(RefNew, Guid, MsgNew),
+ ok;
+ {error, Reason} ->
+ error_logger:error_msg("Message transform failed: ~p~n",
+ [Reason]),
+ ok
+ end
+ end, ok),
+ file_handle_cache:close(RefOld),
+ file_handle_cache:close(RefNew),
+ ok.
+
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 09695d95..75507f43 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -700,8 +700,8 @@ test_topic_expect_match(X, List) ->
fun ({Key, Expected}) ->
BinKey = list_to_binary(Key),
Res = rabbit_exchange_type_topic:route(
- X, #delivery{message = #basic_message{routing_key =
- BinKey}}),
+ X, #delivery{message = #basic_message{routing_keys =
+ [BinKey]}}),
ExpectedRes = lists:map(
fun (Q) -> #resource{virtual_host = <<"/">>,
kind = queue,
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 3dbe740f..ab2300c0 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -64,7 +64,7 @@
-type(content() :: undecoded_content() | decoded_content()).
-type(basic_message() ::
#basic_message{exchange_name :: rabbit_exchange:name(),
- routing_key :: rabbit_router:routing_key(),
+ routing_keys :: [rabbit_router:routing_key()],
content :: content(),
guid :: rabbit_guid:guid(),
is_persistent :: boolean()}).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7142d560..b0781f8f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -22,7 +22,7 @@
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1]).
+ status/1, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -294,6 +294,8 @@
%%----------------------------------------------------------------------------
+-rabbit_upgrade({multiple_routing_keys, []}).
+
-ifdef(use_specs).
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
@@ -1801,3 +1803,30 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
push_betas_to_deltas(
Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1)
end.
+
+%%----------------------------------------------------------------------------
+%% Upgrading
+%%----------------------------------------------------------------------------
+
+multiple_routing_keys() ->
+ transform_storage(
+ fun (BinMsg) ->
+ case binary_to_term(BinMsg) of
+ {basic_message, ExchangeName, Routing_Key, Content, Guid,
+ Persistent} ->
+ {ok, {basic_message, ExchangeName, [Routing_Key], Content,
+ Guid, Persistent}};
+ _ ->
+ {error, corrupt_message}
+ end
+ end),
+ ok.
+
+%% Assumes message store is not running
+transform_storage(TransformFun) ->
+ transform_store(?PERSISTENT_MSG_STORE, TransformFun),
+ transform_store(?TRANSIENT_MSG_STORE, TransformFun).
+
+transform_store(Store, TransformFun) ->
+ rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
+ rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).