diff options
-rw-r--r-- | include/rabbit_exchange_type_spec.hrl | 2 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 3 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/changelog | 6 | ||||
-rw-r--r-- | src/rabbit.erl | 18 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 144 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 1 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 34 | ||||
-rw-r--r-- | src/rabbit_exchange_type.erl | 9 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 3 | ||||
-rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 3 | ||||
-rw-r--r-- | src/rabbit_exchange_type_headers.erl | 3 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 8 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 30 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 5 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 3 | ||||
-rw-r--r-- | src/rabbit_ssl.erl | 5 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 2 |
18 files changed, 145 insertions, 143 deletions
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index 45c475d8..c80cc196 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -21,8 +21,6 @@ -> rabbit_router:match_result()). -spec(validate/1 :: (rabbit_types:exchange()) -> 'ok'). -spec(create/2 :: (boolean(), rabbit_types:exchange()) -> 'ok'). --spec(recover/2 :: (rabbit_types:exchange(), - [rabbit_types:binding()]) -> 'ok'). -spec(delete/3 :: (boolean(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). -spec(add_binding/3 :: (boolean(), rabbit_types:exchange(), diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 45af770a..f9e9df8b 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -120,6 +120,9 @@ done rm -rf %{buildroot} %changelog +* Thu Apr 7 2011 Alexandru Scvortov <alexandru@rabbitmq.com> 2.4.1-1 +- New Upstream Release + * Tue Mar 22 2011 Alexandru Scvortov <alexandru@rabbitmq.com> 2.4.0-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 2ca5074f..0383b955 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (2.4.1-1) lucid; urgency=low + + * New Upstream Release + + -- Alexandru Scvortov <alexandru@rabbitmq.com> Thu, 07 Apr 2011 16:49:22 +0100 + rabbitmq-server (2.4.0-1) lucid; urgency=low * New Upstream Release diff --git a/src/rabbit.erl b/src/rabbit.erl index 807e9e7d..07316138 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -27,7 +27,7 @@ %%--------------------------------------------------------------------------- %% Boot steps. --export([maybe_insert_default_data/0, boot_delegate/0]). +-export([maybe_insert_default_data/0, boot_delegate/0, recover/0]). -rabbit_boot_step({codec_correctness_check, [{description, "codec correctness check"}, @@ -123,15 +123,9 @@ {requires, core_initialized}, {enables, routing_ready}]}). --rabbit_boot_step({exchange_recovery, - [{description, "exchange recovery"}, - {mfa, {rabbit_exchange, recover, []}}, - {requires, empty_db_check}, - {enables, routing_ready}]}). - --rabbit_boot_step({queue_sup_queue_recovery, - [{description, "queue supervisor and queue recovery"}, - {mfa, {rabbit_amqqueue, start, []}}, +-rabbit_boot_step({recovery, + [{description, "exchange, queue and binding recovery"}, + {mfa, {rabbit, recover, []}}, {requires, empty_db_check}, {enables, routing_ready}]}). @@ -186,6 +180,7 @@ -spec(maybe_insert_default_data/0 :: () -> 'ok'). -spec(boot_delegate/0 :: () -> 'ok'). +-spec(recover/0 :: () -> 'ok'). -endif. @@ -464,6 +459,9 @@ boot_delegate() -> {ok, Count} = application:get_env(rabbit, delegate_count), rabbit_sup:start_child(delegate_sup, [Count]). +recover() -> + rabbit_binding:recover(rabbit_exchange:recover(), rabbit_amqqueue:start()). + maybe_insert_default_data() -> case rabbit_mnesia:is_db_empty() of true -> insert_default_data(); diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c7391965..77d3841b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -57,7 +57,7 @@ -type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found'). --spec(start/0 :: () -> 'ok'). +-spec(start/0 :: () -> [name()]). -spec(stop/0 :: () -> 'ok'). -spec(declare/5 :: (name(), boolean(), boolean(), @@ -166,8 +166,7 @@ start() -> {rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, transient, infinity, supervisor, [rabbit_amqqueue_sup]}), - _RealDurableQueues = recover_durable_queues(DurableQueues), - ok. + recover_durable_queues(DurableQueues). stop() -> ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup), @@ -187,8 +186,8 @@ find_durable_queues() -> recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(Q) || Q <- DurableQueues], - [Q || Q <- Qs, - gen_server2:call(Q#amqqueue.pid, {init, true}, infinity) == Q]. + [QName || Q = #amqqueue{name = QName, pid = Pid} <- Qs, + gen_server2:call(Pid, {init, true}, infinity) == {new, Q}]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 6167790e..c2c8dc1f 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -17,7 +17,7 @@ -module(rabbit_binding). -include("rabbit.hrl"). --export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]). +-export([recover/2, exists/1, add/1, add/2, remove/1, remove/2, list/1]). -export([list_for_source/1, list_for_destination/1, list_for_source_and_destination/2]). -export([new_deletions/0, combine_deletions/2, add_deletion/3, @@ -38,24 +38,24 @@ -type(bind_errors() :: rabbit_types:error('source_not_found' | 'destination_not_found' | 'source_and_destination_not_found')). --type(bind_res() :: 'ok' | bind_errors()). +-type(bind_ok_or_error() :: 'ok' | bind_errors() | + rabbit_types:error('binding_not_found')). +-type(bind_res() :: bind_ok_or_error() | rabbit_misc:const(bind_ok_or_error())). -type(inner_fun() :: fun((rabbit_types:exchange(), rabbit_types:exchange() | rabbit_types:amqqueue()) -> rabbit_types:ok_or_error(rabbit_types:amqp_error()))). -type(bindings() :: [rabbit_types:binding()]). --type(add_res() :: bind_res() | rabbit_misc:const(bind_res())). --type(bind_or_error() :: bind_res() | rabbit_types:error('binding_not_found')). --type(remove_res() :: bind_or_error() | rabbit_misc:const(bind_or_error())). -opaque(deletions() :: dict()). --spec(recover/0 :: () -> [rabbit_types:binding()]). +-spec(recover/2 :: ([rabbit_exchange:name()], [rabbit_amqqueue:name()]) -> + 'ok'). -spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()). --spec(add/1 :: (rabbit_types:binding()) -> add_res()). --spec(remove/1 :: (rabbit_types:binding()) -> remove_res()). --spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> add_res()). --spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> remove_res()). +-spec(add/1 :: (rabbit_types:binding()) -> bind_res()). +-spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()). +-spec(remove/1 :: (rabbit_types:binding()) -> bind_res()). +-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()). -spec(list/1 :: (rabbit_types:vhost()) -> bindings()). -spec(list_for_source/1 :: (rabbit_types:binding_source()) -> bindings()). @@ -93,14 +93,27 @@ destination_name, destination_kind, routing_key, arguments]). -recover() -> - rabbit_misc:table_fold( - fun (Route = #route{binding = B}, Acc) -> - {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(rabbit_route, Route, write), - ok = mnesia:write(rabbit_reverse_route, ReverseRoute, write), - [B | Acc] - end, [], rabbit_durable_route). +recover(XNames, QNames) -> + XNameSet = sets:from_list(XNames), + QNameSet = sets:from_list(QNames), + rabbit_misc:table_filter( + fun (#route{binding = #binding{destination = Dst = + #resource{kind = Kind}}}) -> + sets:is_element(Dst, case Kind of + exchange -> XNameSet; + queue -> QNameSet + end) + end, + fun (R = #route{binding = B = #binding{source = Src}}, Tx) -> + case Tx of + true -> ok = sync_transient_binding(R, fun mnesia:write/3); + false -> ok + end, + {ok, X} = rabbit_exchange:lookup(Src), + rabbit_exchange:callback(X, add_binding, [Tx, X, B]) + end, + rabbit_durable_route), + ok. exists(Binding) -> binding_action( @@ -110,8 +123,6 @@ exists(Binding) -> add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end). -remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end). - add(Binding, InnerFun) -> binding_action( Binding, @@ -120,51 +131,46 @@ add(Binding, InnerFun) -> %% in general, we want to fail on that in preference to %% anything else case InnerFun(Src, Dst) of - ok -> - case mnesia:read({rabbit_route, B}) of - [] -> ok = sync_binding(B, all_durable([Src, Dst]), - fun mnesia:write/3), - fun (Tx) -> - ok = rabbit_exchange:callback( - Src, add_binding, [Tx, Src, B]), - rabbit_event:notify_if( - not Tx, binding_created, info(B)) - end; - [_] -> fun rabbit_misc:const_ok/1 - end; - {error, _} = Err -> - rabbit_misc:const(Err) + ok -> case mnesia:read({rabbit_route, B}) of + [] -> add(Src, Dst, B); + [_] -> fun rabbit_misc:const_ok/1 + end; + {error, _} = Err -> rabbit_misc:const(Err) end end). +add(Src, Dst, B) -> + Durable = all_durable([Src, Dst]), + case (not Durable orelse mnesia:read({rabbit_durable_route, B}) =:= []) of + true -> ok = sync_binding(B, Durable, fun mnesia:write/3), + fun (Tx) -> ok = rabbit_exchange:callback(Src, add_binding, + [Tx, Src, B]), + rabbit_event:notify_if(not Tx, binding_created, + info(B)) + end; + false -> rabbit_misc:const({error, binding_not_found}) + end. + +remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end). + remove(Binding, InnerFun) -> binding_action( Binding, fun (Src, Dst, B) -> - Result = - case mnesia:match_object(rabbit_route, #route{binding = B}, - write) of - [] -> - {error, binding_not_found}; - [_] -> - case InnerFun(Src, Dst) of - ok -> - ok = sync_binding(B, all_durable([Src, Dst]), - fun mnesia:delete_object/3), - {ok, maybe_auto_delete(B#binding.source, - [B], new_deletions())}; - {error, _} = E -> - E - end - end, - case Result of - {error, _} = Err -> - rabbit_misc:const(Err); - {ok, Deletions} -> - fun (Tx) -> ok = process_deletions(Deletions, Tx) end + case mnesia:read(rabbit_route, B, write) of + [] -> rabbit_misc:const({error, binding_not_found}); + [_] -> case InnerFun(Src, Dst) of + ok -> remove(Src, Dst, B); + {error, _} = Err -> rabbit_misc:const(Err) + end end end). +remove(Src, Dst, B) -> + ok = sync_binding(B, all_durable([Src, Dst]), fun mnesia:delete_object/3), + Deletions = maybe_auto_delete(B#binding.source, [B], new_deletions()), + fun (Tx) -> ok = process_deletions(Deletions, Tx) end. + list(VHostPath) -> VHostResource = rabbit_misc:r(VHostPath, '_'), Route = #route{binding = #binding{source = VHostResource, @@ -259,31 +265,31 @@ binding_action(Binding = #binding{source = SrcName, Fun(Src, Dst, Binding#binding{args = SortedArgs}) end). -sync_binding(Binding, Durable, Fun) -> - ok = case Durable of - true -> Fun(rabbit_durable_route, - #route{binding = Binding}, write); - false -> ok - end, +sync_binding(Binding, true, Fun) -> + ok = Fun(rabbit_durable_route, #route{binding = Binding}, write), + ok = sync_transient_binding(Binding, Fun); + +sync_binding(Binding, false, Fun) -> + ok = sync_transient_binding(Binding, Fun). + +sync_transient_binding(Binding, Fun) -> {Route, ReverseRoute} = route_with_reverse(Binding), ok = Fun(rabbit_route, Route, write), - ok = Fun(rabbit_reverse_route, ReverseRoute, write), - ok. + ok = Fun(rabbit_reverse_route, ReverseRoute, write). call_with_source_and_destination(SrcName, DstName, Fun) -> SrcTable = table_for_resource(SrcName), DstTable = table_for_resource(DstName), - ErrFun = fun (Err) -> rabbit_misc:const(Err) end, + ErrFun = fun (Err) -> rabbit_misc:const({error, Err}) end, rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case {mnesia:read({SrcTable, SrcName}), mnesia:read({DstTable, DstName})} of {[Src], [Dst]} -> Fun(Src, Dst); - {[], [_] } -> ErrFun({error, source_not_found}); - {[_], [] } -> ErrFun({error, destination_not_found}); - {[], [] } -> ErrFun({error, - source_and_destination_not_found}) - end + {[], [_] } -> ErrFun(source_not_found); + {[_], [] } -> ErrFun(destination_not_found); + {[], [] } -> ErrFun(source_and_destination_not_found) + end end). table_for_resource(#resource{kind = exchange}) -> rabbit_exchange; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8a234c0f..bfd779ee 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -156,6 +156,7 @@ ready_for_close(Pid) -> init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, Capabilities, CollectorPid, StartLimiterFun]) -> + process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), StatsTimer = rabbit_event:init_stats_timer(), State = #ch{state = starting, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 9d9b07af..42111773 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -36,7 +36,7 @@ -type(type() :: atom()). -type(fun_name() :: atom()). --spec(recover/0 :: () -> 'ok'). +-spec(recover/0 :: () -> [name()]). -spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok'). -spec(declare/6 :: (name(), type(), boolean(), boolean(), boolean(), @@ -83,25 +83,19 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]). recover() -> - Xs = rabbit_misc:table_fold( - fun (X, Acc) -> - ok = mnesia:write(rabbit_exchange, X, write), - [X | Acc] - end, [], rabbit_durable_exchange), - Bs = rabbit_binding:recover(), - recover_with_bindings( - lists:keysort(#binding.source, Bs), - lists:keysort(#exchange.name, Xs), []). - -recover_with_bindings([B = #binding{source = XName} | Rest], - Xs = [#exchange{name = XName} | _], - Bindings) -> - recover_with_bindings(Rest, Xs, [B | Bindings]); -recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> - (type_to_module(Type)):recover(X, Bindings), - recover_with_bindings(Bs, Xs, []); -recover_with_bindings([], [], []) -> - ok. + Xs = rabbit_misc:table_filter( + fun (#exchange{name = XName}) -> + mnesia:read({rabbit_exchange, XName}) =:= [] + end, + fun (X, Tx) -> + case Tx of + true -> ok = mnesia:write(rabbit_exchange, X, write); + false -> ok + end, + rabbit_exchange:callback(X, create, [Tx, X]) + end, + rabbit_durable_exchange), + [XName || #exchange{name = XName} <- Xs]. callback(#exchange{type = XType}, Fun, Args) -> apply(type_to_module(XType), Fun, Args). diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 547583e9..cd96407c 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -26,16 +26,13 @@ behaviour_info(callbacks) -> %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} {validate, 1}, - %% called after declaration when previously absent + %% called after declaration and recovery {create, 2}, - %% called when recovering - {recover, 2}, - - %% called after exchange deletion. + %% called after exchange (auto)deletion. {delete, 3}, - %% called after a binding has been added + %% called after a binding has been added or recovered {add_binding, 3}, %% called after bindings have been deleted. diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 349c2f6e..40078b1a 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/2, recover/2, delete/3, +-export([validate/1, create/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -41,7 +41,6 @@ route(#exchange{name = Name}, validate(_X) -> ok. create(_Tx, _X) -> ok. -recover(_X, _Bs) -> ok. delete(_Tx, _X, _Bs) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index bc5293c8..f32ef917 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/2, recover/2, delete/3, add_binding/3, +-export([validate/1, create/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -40,7 +40,6 @@ route(#exchange{name = Name}, _Delivery) -> validate(_X) -> ok. create(_Tx, _X) -> ok. -recover(_X, _Bs) -> ok. delete(_Tx, _X, _Bs) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index d3529b06..139feb04 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -21,7 +21,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/2, recover/2, delete/3, add_binding/3, +-export([validate/1, create/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -114,7 +114,6 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], validate(_X) -> ok. create(_Tx, _X) -> ok. -recover(_X, _Bs) -> ok. delete(_Tx, _X, _Bs) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index c192f8cf..74c566b8 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -21,7 +21,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/2, recover/2, delete/3, add_binding/3, +-export([validate/1, create/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -49,12 +49,6 @@ route(#exchange{name = X}, validate(_X) -> ok. create(_Tx, _X) -> ok. -recover(_Exchange, Bs) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - lists:foreach(fun (B) -> internal_add_binding(B) end, Bs) - end). - delete(true, #exchange{name = X}, _Bs) -> trie_remove_all_edges(X), trie_remove_all_bindings(X), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 1daeeb2a..cec10ff6 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -38,7 +38,7 @@ -export([ensure_ok/2]). -export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]). -export([upmap/2, map_in_order/2]). --export([table_fold/3]). +-export([table_filter/3]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([read_term_file/1, write_term_file/2]). -export([append_file/2, ensure_parent_dirs_exist/1]). @@ -145,7 +145,8 @@ -> atom()). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). --spec(table_fold/3 :: (fun ((any(), A) -> A), A, atom()) -> A). +-spec(table_filter/3:: (fun ((A) -> boolean()), fun ((A, boolean()) -> 'ok'), + atom()) -> [A]). -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). @@ -459,20 +460,23 @@ map_in_order(F, L) -> lists:reverse( lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)). -%% Fold over each entry in a table, executing the cons function in a -%% transaction. This is often far more efficient than wrapping a tx -%% around the lot. +%% Apply a pre-post-commit function to all entries in a table that +%% satisfy a predicate, and return those entries. %% %% We ignore entries that have been modified or removed. -table_fold(F, Acc0, TableName) -> +table_filter(Pred, PrePostCommitFun, TableName) -> lists:foldl( - fun (E, Acc) -> execute_mnesia_transaction( - fun () -> case mnesia:match_object(TableName, E, read) of - [] -> Acc; - _ -> F(E, Acc) - end - end) - end, Acc0, dirty_read_all(TableName)). + fun (E, Acc) -> + case execute_mnesia_transaction( + fun () -> mnesia:match_object(TableName, E, read) =/= [] + andalso Pred(E) end, + fun (false, _Tx) -> false; + (true, Tx) -> PrePostCommitFun(E, Tx), true + end) of + false -> Acc; + true -> [E | Acc] + end + end, [], dirty_read_all(TableName)). dirty_read_all(TableName) -> mnesia:dirty_select(TableName, [{'$1',[],['$1']}]). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 65688142..3f4162cd 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1891,7 +1891,10 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, force_recovery(BaseDir, Store) -> Dir = filename:join(BaseDir, atom_to_list(Store)), - ok = file:delete(filename:join(Dir, ?CLEAN_FILENAME)), + case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of + ok -> ok; + {error, enoent} -> ok + end, recover_crashed_compactions(BaseDir), ok. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 609bb43f..42af91a8 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -681,7 +681,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, State#v1{connection_state = running, connection = NewConnection}), rabbit_event:notify(connection_created, - infos(?CREATION_EVENT_KEYS, State1)), + [{type, network} | + infos(?CREATION_EVENT_KEYS, State1)]), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State1) end), State1; diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index a3cd2b37..e0defa9e 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -211,7 +211,8 @@ format_asn1_value(V) -> %% subset of ASCII it is also a subset of UTF-8. The others need %% converting. Fortunately since the Erlang SSL library does the %% decoding for us (albeit into a weird format, see below), we just -%% need to handle encoding into UTF-8. +%% need to handle encoding into UTF-8. Note also that utf8Strings come +%% back as binary. %% %% Note for testing: the default Ubuntu configuration for openssl will %% only create printableString or teletexString types no matter what @@ -225,7 +226,7 @@ format_directory_string(printableString, S) -> S; format_directory_string(teletexString, S) -> utf8_list_from(S); format_directory_string(bmpString, S) -> utf8_list_from(S); format_directory_string(universalString, S) -> utf8_list_from(S); -format_directory_string(utf8String, S) -> S. +format_directory_string(utf8String, S) -> binary_to_list(S). utf8_list_from(S) -> binary_to_list( diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 294fae97..38492984 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2320,7 +2320,7 @@ test_queue_recover() -> after 10000 -> exit(timeout_waiting_for_queue_death) end, rabbit_amqqueue:stop(), - ok = rabbit_amqqueue:start(), + rabbit_amqqueue:start(), rabbit_amqqueue:with_or_die( QName, fun (Q1 = #amqqueue { pid = QPid1 }) -> |