diff options
author | Emile Joubert <emile@rabbitmq.com> | 2011-02-15 15:27:52 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2011-02-15 15:27:52 +0000 |
commit | 17ebfb85ebc28c01dfc29e7089dbbf6d1688bc6c (patch) | |
tree | f7e3dbdb6c198be987a52389c06fdba25fb79abd /src | |
parent | 394c73b033ca71d98b0572317852b107abe97a38 (diff) | |
download | rabbitmq-server-17ebfb85ebc28c01dfc29e7089dbbf6d1688bc6c.tar.gz |
Sender-specified distribution updates
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_basic.erl | 41 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 36 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 42 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 19 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 18 |
5 files changed, 92 insertions, 64 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 7fa68882..503f01bc 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -31,6 +31,7 @@ -type(publish_result() :: ({ok, rabbit_router:routing_result(), [pid()]} | rabbit_types:error('not_found'))). +-type(msg_or_error() :: {'ok', rabbit_types:message()} | {'error', any()}). -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). @@ -40,10 +41,10 @@ rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), - properties_input(), binary()) -> rabbit_types:message()). + properties_input(), binary()) -> msg_or_error()). -spec(message/3 :: (rabbit_exchange:name(), rabbit_router:routing_key(), - rabbit_types:decoded_content()) -> rabbit_types:message()). + rabbit_types:decoded_content()) -> msg_or_error()). -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). -spec(publish/4 :: @@ -111,17 +112,23 @@ strip_header(DecodedContent, _Key) -> message(ExchangeName, RoutingKey, #content{properties = Props} = DecodedContent) -> - #basic_message{ - exchange_name = ExchangeName, - content = strip_header(DecodedContent, ?DELETED_HEADER), - guid = rabbit_guid:guid(), - is_persistent = is_message_persistent(DecodedContent), - routing_keys = [RoutingKey | header_routes(Props#'P_basic'.headers)]}. - -message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> + try + {ok, #basic_message{ + exchange_name = ExchangeName, + content = strip_header(DecodedContent, ?DELETED_HEADER), + guid = rabbit_guid:guid(), + is_persistent = is_message_persistent(DecodedContent), + routing_keys = [RoutingKey | + header_routes(Props#'P_basic'.headers)]}} + catch + {error, _Reason} = Error -> Error + end. + +message(ExchangeName, RoutingKey, RawProperties, BodyBin) -> Properties = properties(RawProperties), Content = build_content(Properties, BodyBin), - message(ExchangeName, RoutingKeyBin, Content). + {ok, Msg} = message(ExchangeName, RoutingKey, Content), + Msg. properties(P = #'P_basic'{}) -> P; @@ -170,8 +177,12 @@ is_message_persistent(#content{properties = #'P_basic'{ header_routes(undefined) -> []; header_routes(HeadersTable) -> - lists:append([case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of - {array, Routes} -> [Route || {longstr, Route} <- Routes]; - _ -> [] - end || HeaderKey <- ?ROUTING_HEADERS]). + lists:append( + [case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of + {array, Routes} -> [Route || {longstr, Route} <- Routes]; + undefined -> []; + {Type, _Val} -> throw({error, {unacceptable_type_in_header, + Type, + binary_to_list(HeaderKey)}}) + end || HeaderKey <- ?ROUTING_HEADERS]). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 16a3911d..162580ec 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -555,21 +555,27 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, true -> SeqNo = State#ch.publish_seqno, {SeqNo, State#ch{publish_seqno = SeqNo + 1}} end, - Message = rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent), - {RoutingRes, DeliveredQPids} = - rabbit_exchange:publish( - Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, - MsgSeqNo)), - State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName, - MsgSeqNo, Message, State1), - maybe_incr_stats([{ExchangeName, 1} | - [{{QPid, ExchangeName}, 1} || - QPid <- DeliveredQPids]], publish, State2), - {noreply, case TxnKey of - none -> State2; - _ -> add_tx_participants(DeliveredQPids, State2) - end}; + case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of + {ok, Message} -> + {RoutingRes, DeliveredQPids} = + rabbit_exchange:publish( + Exchange, + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, + MsgSeqNo)), + State2 = process_routing_result(RoutingRes, DeliveredQPids, + ExchangeName, MsgSeqNo, Message, + State1), + maybe_incr_stats([{ExchangeName, 1} | + [{{QPid, ExchangeName}, 1} || + QPid <- DeliveredQPids]], publish, State2), + {noreply, case TxnKey of + none -> State2; + _ -> add_tx_participants(DeliveredQPids, State2) + end}; + {error, Reason} -> + rabbit_misc:protocol_error(precondition_failed, + "invalid message: ~p", [Reason]) + end; handle_method(#'basic.nack'{delivery_tag = DeliveryTag, multiple = Multiple, diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index f7afbef5..00c2ab18 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1968,29 +1968,43 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, {destination, Destination}]} end. -force_recovery(BaseDir, Server) -> - Dir = filename:join(BaseDir, atom_to_list(Server)), +force_recovery(BaseDir, Store) -> + Dir = filename:join(BaseDir, atom_to_list(Store)), file:delete(filename:join(Dir, ?CLEAN_FILENAME)), [file:delete(filename:join(Dir, File)) || File <- list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP)], ok. -transform_dir(BaseDir, Server, TransformFun) -> - Dir = filename:join(BaseDir, atom_to_list(Server)), +for_each_file(Files, Fun) -> + [Fun(File) || File <- Files]. + +transform_dir(BaseDir, Store, TransformFun) -> + Dir = filename:join(BaseDir, atom_to_list(Store)), TmpDir = filename:join(Dir, ?TRANSFORM_TMP), case filelib:is_dir(TmpDir) of true -> throw({error, transform_failed_previously}); false -> - [transform_msg_file(filename:join(Dir, File), - filename:join(TmpDir, File), - TransformFun) || - File <- list_sorted_file_names(Dir, ?FILE_EXTENSION)], - [file:delete(filename:join(Dir, File)) || - File <- list_sorted_file_names(Dir, ?FILE_EXTENSION)], - [file:copy(filename:join(TmpDir, File), filename:join(Dir, File)) || - File <- list_sorted_file_names(TmpDir, ?FILE_EXTENSION)], - [file:delete(filename:join(TmpDir, File)) || - File <- list_sorted_file_names(TmpDir, ?FILE_EXTENSION)], + OldFileList = list_sorted_file_names(Dir, ?FILE_EXTENSION), + for_each_file(OldFileList, + fun (File) -> + transform_msg_file(filename:join(Dir, File), + filename:join(TmpDir, File), + TransformFun) + end), + for_each_file(OldFileList, + fun (File) -> + file:delete(filename:join(Dir, File)) + end), + NewFileList = list_sorted_file_names(TmpDir, ?FILE_EXTENSION), + for_each_file(NewFileList, + fun (File) -> + file:copy(filename:join(TmpDir, File), + filename:join(Dir, File)) + end), + for_each_file(NewFileList, + fun (File) -> + file:delete(filename:join(TmpDir, File)) + end), ok = file:del_dir(TmpDir) end. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 73f59557..68b88b3e 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -25,7 +25,6 @@ -rabbit_upgrade({add_ip_to_listener, []}). -rabbit_upgrade({internal_exchanges, []}). -rabbit_upgrade({user_to_internal_user, [hash_passwords]}). --rabbit_upgrade({multiple_routing_keys, []}). %% ------------------------------------------------------------------- @@ -36,7 +35,6 @@ -spec(add_ip_to_listener/0 :: () -> 'ok'). -spec(internal_exchanges/0 :: () -> 'ok'). -spec(user_to_internal_user/0 :: () -> 'ok'). --spec(multiple_routing_keys/0 :: () -> 'ok'). -endif. @@ -103,20 +101,3 @@ mnesia(TableName, Fun, FieldList, NewRecordName) -> {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList, NewRecordName), ok. - -%%-------------------------------------------------------------------- - -multiple_routing_keys() -> - rabbit_variable_queue:transform_storage( - fun (BinMsg) -> - case binary_to_term(BinMsg) of - {basic_message, ExchangeName, Routing_Key, Content, Guid, - Persistent} -> - {ok, {basic_message, ExchangeName, [Routing_Key], Content, - Guid, Persistent}}; - _ -> - {error, corrupt_message} - end - end), - ok. - diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index dee6a8e5..b0781f8f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -22,7 +22,7 @@ requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1, transform_storage/1]). + status/1, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -294,6 +294,8 @@ %%---------------------------------------------------------------------------- +-rabbit_upgrade({multiple_routing_keys, []}). + -ifdef(use_specs). -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). @@ -1806,6 +1808,20 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> %% Upgrading %%---------------------------------------------------------------------------- +multiple_routing_keys() -> + transform_storage( + fun (BinMsg) -> + case binary_to_term(BinMsg) of + {basic_message, ExchangeName, Routing_Key, Content, Guid, + Persistent} -> + {ok, {basic_message, ExchangeName, [Routing_Key], Content, + Guid, Persistent}}; + _ -> + {error, corrupt_message} + end + end), + ok. + %% Assumes message store is not running transform_storage(TransformFun) -> transform_store(?PERSISTENT_MSG_STORE, TransformFun), |