summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-02-15 15:27:52 +0000
committerEmile Joubert <emile@rabbitmq.com>2011-02-15 15:27:52 +0000
commit17ebfb85ebc28c01dfc29e7089dbbf6d1688bc6c (patch)
treef7e3dbdb6c198be987a52389c06fdba25fb79abd
parent394c73b033ca71d98b0572317852b107abe97a38 (diff)
downloadrabbitmq-server-17ebfb85ebc28c01dfc29e7089dbbf6d1688bc6c.tar.gz
Sender-specified distribution updates
-rw-r--r--include/rabbit_backing_queue_spec.hrl4
-rw-r--r--src/rabbit_basic.erl41
-rw-r--r--src/rabbit_channel.erl36
-rw-r--r--src/rabbit_msg_store.erl42
-rw-r--r--src/rabbit_upgrade_functions.erl19
-rw-r--r--src/rabbit_variable_queue.erl18
6 files changed, 93 insertions, 67 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 52ffd413..17cdedc2 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -65,6 +65,4 @@
-spec(idle_timeout/1 :: (state()) -> state()).
-spec(handle_pre_hibernate/1 :: (state()) -> state()).
-spec(status/1 :: (state()) -> [{atom(), any()}]).
--spec(transform_storage/1 ::
- (fun ((binary()) -> (rabbit_types:ok_or_error2(any(), any())))) ->
- non_neg_integer()).
+-spec(multiple_routing_keys/0 :: () -> 'ok').
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 7fa68882..503f01bc 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -31,6 +31,7 @@
-type(publish_result() ::
({ok, rabbit_router:routing_result(), [pid()]}
| rabbit_types:error('not_found'))).
+-type(msg_or_error() :: {'ok', rabbit_types:message()} | {'error', any()}).
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
@@ -40,10 +41,10 @@
rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
- properties_input(), binary()) -> rabbit_types:message()).
+ properties_input(), binary()) -> msg_or_error()).
-spec(message/3 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
- rabbit_types:decoded_content()) -> rabbit_types:message()).
+ rabbit_types:decoded_content()) -> msg_or_error()).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
-spec(publish/4 ::
@@ -111,17 +112,23 @@ strip_header(DecodedContent, _Key) ->
message(ExchangeName, RoutingKey,
#content{properties = Props} = DecodedContent) ->
- #basic_message{
- exchange_name = ExchangeName,
- content = strip_header(DecodedContent, ?DELETED_HEADER),
- guid = rabbit_guid:guid(),
- is_persistent = is_message_persistent(DecodedContent),
- routing_keys = [RoutingKey | header_routes(Props#'P_basic'.headers)]}.
-
-message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
+ try
+ {ok, #basic_message{
+ exchange_name = ExchangeName,
+ content = strip_header(DecodedContent, ?DELETED_HEADER),
+ guid = rabbit_guid:guid(),
+ is_persistent = is_message_persistent(DecodedContent),
+ routing_keys = [RoutingKey |
+ header_routes(Props#'P_basic'.headers)]}}
+ catch
+ {error, _Reason} = Error -> Error
+ end.
+
+message(ExchangeName, RoutingKey, RawProperties, BodyBin) ->
Properties = properties(RawProperties),
Content = build_content(Properties, BodyBin),
- message(ExchangeName, RoutingKeyBin, Content).
+ {ok, Msg} = message(ExchangeName, RoutingKey, Content),
+ Msg.
properties(P = #'P_basic'{}) ->
P;
@@ -170,8 +177,12 @@ is_message_persistent(#content{properties = #'P_basic'{
header_routes(undefined) ->
[];
header_routes(HeadersTable) ->
- lists:append([case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of
- {array, Routes} -> [Route || {longstr, Route} <- Routes];
- _ -> []
- end || HeaderKey <- ?ROUTING_HEADERS]).
+ lists:append(
+ [case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of
+ {array, Routes} -> [Route || {longstr, Route} <- Routes];
+ undefined -> [];
+ {Type, _Val} -> throw({error, {unacceptable_type_in_header,
+ Type,
+ binary_to_list(HeaderKey)}})
+ end || HeaderKey <- ?ROUTING_HEADERS]).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 16a3911d..162580ec 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -555,21 +555,27 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
true -> SeqNo = State#ch.publish_seqno,
{SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
- Message = rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent),
- {RoutingRes, DeliveredQPids} =
- rabbit_exchange:publish(
- Exchange,
- rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
- MsgSeqNo)),
- State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName,
- MsgSeqNo, Message, State1),
- maybe_incr_stats([{ExchangeName, 1} |
- [{{QPid, ExchangeName}, 1} ||
- QPid <- DeliveredQPids]], publish, State2),
- {noreply, case TxnKey of
- none -> State2;
- _ -> add_tx_participants(DeliveredQPids, State2)
- end};
+ case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
+ {ok, Message} ->
+ {RoutingRes, DeliveredQPids} =
+ rabbit_exchange:publish(
+ Exchange,
+ rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
+ MsgSeqNo)),
+ State2 = process_routing_result(RoutingRes, DeliveredQPids,
+ ExchangeName, MsgSeqNo, Message,
+ State1),
+ maybe_incr_stats([{ExchangeName, 1} |
+ [{{QPid, ExchangeName}, 1} ||
+ QPid <- DeliveredQPids]], publish, State2),
+ {noreply, case TxnKey of
+ none -> State2;
+ _ -> add_tx_participants(DeliveredQPids, State2)
+ end};
+ {error, Reason} ->
+ rabbit_misc:protocol_error(precondition_failed,
+ "invalid message: ~p", [Reason])
+ end;
handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
multiple = Multiple,
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index f7afbef5..00c2ab18 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1968,29 +1968,43 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
{destination, Destination}]}
end.
-force_recovery(BaseDir, Server) ->
- Dir = filename:join(BaseDir, atom_to_list(Server)),
+force_recovery(BaseDir, Store) ->
+ Dir = filename:join(BaseDir, atom_to_list(Store)),
file:delete(filename:join(Dir, ?CLEAN_FILENAME)),
[file:delete(filename:join(Dir, File)) ||
File <- list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP)],
ok.
-transform_dir(BaseDir, Server, TransformFun) ->
- Dir = filename:join(BaseDir, atom_to_list(Server)),
+for_each_file(Files, Fun) ->
+ [Fun(File) || File <- Files].
+
+transform_dir(BaseDir, Store, TransformFun) ->
+ Dir = filename:join(BaseDir, atom_to_list(Store)),
TmpDir = filename:join(Dir, ?TRANSFORM_TMP),
case filelib:is_dir(TmpDir) of
true -> throw({error, transform_failed_previously});
false ->
- [transform_msg_file(filename:join(Dir, File),
- filename:join(TmpDir, File),
- TransformFun) ||
- File <- list_sorted_file_names(Dir, ?FILE_EXTENSION)],
- [file:delete(filename:join(Dir, File)) ||
- File <- list_sorted_file_names(Dir, ?FILE_EXTENSION)],
- [file:copy(filename:join(TmpDir, File), filename:join(Dir, File)) ||
- File <- list_sorted_file_names(TmpDir, ?FILE_EXTENSION)],
- [file:delete(filename:join(TmpDir, File)) ||
- File <- list_sorted_file_names(TmpDir, ?FILE_EXTENSION)],
+ OldFileList = list_sorted_file_names(Dir, ?FILE_EXTENSION),
+ for_each_file(OldFileList,
+ fun (File) ->
+ transform_msg_file(filename:join(Dir, File),
+ filename:join(TmpDir, File),
+ TransformFun)
+ end),
+ for_each_file(OldFileList,
+ fun (File) ->
+ file:delete(filename:join(Dir, File))
+ end),
+ NewFileList = list_sorted_file_names(TmpDir, ?FILE_EXTENSION),
+ for_each_file(NewFileList,
+ fun (File) ->
+ file:copy(filename:join(TmpDir, File),
+ filename:join(Dir, File))
+ end),
+ for_each_file(NewFileList,
+ fun (File) ->
+ file:delete(filename:join(TmpDir, File))
+ end),
ok = file:del_dir(TmpDir)
end.
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 73f59557..68b88b3e 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -25,7 +25,6 @@
-rabbit_upgrade({add_ip_to_listener, []}).
-rabbit_upgrade({internal_exchanges, []}).
-rabbit_upgrade({user_to_internal_user, [hash_passwords]}).
--rabbit_upgrade({multiple_routing_keys, []}).
%% -------------------------------------------------------------------
@@ -36,7 +35,6 @@
-spec(add_ip_to_listener/0 :: () -> 'ok').
-spec(internal_exchanges/0 :: () -> 'ok').
-spec(user_to_internal_user/0 :: () -> 'ok').
--spec(multiple_routing_keys/0 :: () -> 'ok').
-endif.
@@ -103,20 +101,3 @@ mnesia(TableName, Fun, FieldList, NewRecordName) ->
{atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList,
NewRecordName),
ok.
-
-%%--------------------------------------------------------------------
-
-multiple_routing_keys() ->
- rabbit_variable_queue:transform_storage(
- fun (BinMsg) ->
- case binary_to_term(BinMsg) of
- {basic_message, ExchangeName, Routing_Key, Content, Guid,
- Persistent} ->
- {ok, {basic_message, ExchangeName, [Routing_Key], Content,
- Guid, Persistent}};
- _ ->
- {error, corrupt_message}
- end
- end),
- ok.
-
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index dee6a8e5..b0781f8f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -22,7 +22,7 @@
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1, transform_storage/1]).
+ status/1, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -294,6 +294,8 @@
%%----------------------------------------------------------------------------
+-rabbit_upgrade({multiple_routing_keys, []}).
+
-ifdef(use_specs).
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
@@ -1806,6 +1808,20 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
%% Upgrading
%%----------------------------------------------------------------------------
+multiple_routing_keys() ->
+ transform_storage(
+ fun (BinMsg) ->
+ case binary_to_term(BinMsg) of
+ {basic_message, ExchangeName, Routing_Key, Content, Guid,
+ Persistent} ->
+ {ok, {basic_message, ExchangeName, [Routing_Key], Content,
+ Guid, Persistent}};
+ _ ->
+ {error, corrupt_message}
+ end
+ end),
+ ok.
+
%% Assumes message store is not running
transform_storage(TransformFun) ->
transform_store(?PERSISTENT_MSG_STORE, TransformFun),