summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit_exchange_type_spec.hrl2
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--src/rabbit.erl18
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_binding.erl144
-rw-r--r--src/rabbit_channel.erl1
-rw-r--r--src/rabbit_exchange.erl34
-rw-r--r--src/rabbit_exchange_type.erl9
-rw-r--r--src/rabbit_exchange_type_direct.erl3
-rw-r--r--src/rabbit_exchange_type_fanout.erl3
-rw-r--r--src/rabbit_exchange_type_headers.erl3
-rw-r--r--src/rabbit_exchange_type_topic.erl8
-rw-r--r--src/rabbit_misc.erl30
-rw-r--r--src/rabbit_msg_store.erl5
-rw-r--r--src/rabbit_reader.erl3
-rw-r--r--src/rabbit_ssl.erl5
-rw-r--r--src/rabbit_tests.erl2
18 files changed, 145 insertions, 143 deletions
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index 45c475d8..c80cc196 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -21,8 +21,6 @@
-> rabbit_router:match_result()).
-spec(validate/1 :: (rabbit_types:exchange()) -> 'ok').
-spec(create/2 :: (boolean(), rabbit_types:exchange()) -> 'ok').
--spec(recover/2 :: (rabbit_types:exchange(),
- [rabbit_types:binding()]) -> 'ok').
-spec(delete/3 :: (boolean(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
-spec(add_binding/3 :: (boolean(), rabbit_types:exchange(),
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 45af770a..f9e9df8b 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -120,6 +120,9 @@ done
rm -rf %{buildroot}
%changelog
+* Thu Apr 7 2011 Alexandru Scvortov <alexandru@rabbitmq.com> 2.4.1-1
+- New Upstream Release
+
* Tue Mar 22 2011 Alexandru Scvortov <alexandru@rabbitmq.com> 2.4.0-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 2ca5074f..0383b955 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (2.4.1-1) lucid; urgency=low
+
+ * New Upstream Release
+
+ -- Alexandru Scvortov <alexandru@rabbitmq.com> Thu, 07 Apr 2011 16:49:22 +0100
+
rabbitmq-server (2.4.0-1) lucid; urgency=low
* New Upstream Release
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 807e9e7d..07316138 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -27,7 +27,7 @@
%%---------------------------------------------------------------------------
%% Boot steps.
--export([maybe_insert_default_data/0, boot_delegate/0]).
+-export([maybe_insert_default_data/0, boot_delegate/0, recover/0]).
-rabbit_boot_step({codec_correctness_check,
[{description, "codec correctness check"},
@@ -123,15 +123,9 @@
{requires, core_initialized},
{enables, routing_ready}]}).
--rabbit_boot_step({exchange_recovery,
- [{description, "exchange recovery"},
- {mfa, {rabbit_exchange, recover, []}},
- {requires, empty_db_check},
- {enables, routing_ready}]}).
-
--rabbit_boot_step({queue_sup_queue_recovery,
- [{description, "queue supervisor and queue recovery"},
- {mfa, {rabbit_amqqueue, start, []}},
+-rabbit_boot_step({recovery,
+ [{description, "exchange, queue and binding recovery"},
+ {mfa, {rabbit, recover, []}},
{requires, empty_db_check},
{enables, routing_ready}]}).
@@ -186,6 +180,7 @@
-spec(maybe_insert_default_data/0 :: () -> 'ok').
-spec(boot_delegate/0 :: () -> 'ok').
+-spec(recover/0 :: () -> 'ok').
-endif.
@@ -464,6 +459,9 @@ boot_delegate() ->
{ok, Count} = application:get_env(rabbit, delegate_count),
rabbit_sup:start_child(delegate_sup, [Count]).
+recover() ->
+ rabbit_binding:recover(rabbit_exchange:recover(), rabbit_amqqueue:start()).
+
maybe_insert_default_data() ->
case rabbit_mnesia:is_db_empty() of
true -> insert_default_data();
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c7391965..77d3841b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -57,7 +57,7 @@
-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
--spec(start/0 :: () -> 'ok').
+-spec(start/0 :: () -> [name()]).
-spec(stop/0 :: () -> 'ok').
-spec(declare/5 ::
(name(), boolean(), boolean(),
@@ -166,8 +166,7 @@ start() ->
{rabbit_amqqueue_sup,
{rabbit_amqqueue_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup]}),
- _RealDurableQueues = recover_durable_queues(DurableQueues),
- ok.
+ recover_durable_queues(DurableQueues).
stop() ->
ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup),
@@ -187,8 +186,8 @@ find_durable_queues() ->
recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
- [Q || Q <- Qs,
- gen_server2:call(Q#amqqueue.pid, {init, true}, infinity) == Q].
+ [QName || Q = #amqqueue{name = QName, pid = Pid} <- Qs,
+ gen_server2:call(Pid, {init, true}, infinity) == {new, Q}].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 6167790e..c2c8dc1f 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -17,7 +17,7 @@
-module(rabbit_binding).
-include("rabbit.hrl").
--export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]).
+-export([recover/2, exists/1, add/1, add/2, remove/1, remove/2, list/1]).
-export([list_for_source/1, list_for_destination/1,
list_for_source_and_destination/2]).
-export([new_deletions/0, combine_deletions/2, add_deletion/3,
@@ -38,24 +38,24 @@
-type(bind_errors() :: rabbit_types:error('source_not_found' |
'destination_not_found' |
'source_and_destination_not_found')).
--type(bind_res() :: 'ok' | bind_errors()).
+-type(bind_ok_or_error() :: 'ok' | bind_errors() |
+ rabbit_types:error('binding_not_found')).
+-type(bind_res() :: bind_ok_or_error() | rabbit_misc:const(bind_ok_or_error())).
-type(inner_fun() ::
fun((rabbit_types:exchange(),
rabbit_types:exchange() | rabbit_types:amqqueue()) ->
rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
-type(bindings() :: [rabbit_types:binding()]).
--type(add_res() :: bind_res() | rabbit_misc:const(bind_res())).
--type(bind_or_error() :: bind_res() | rabbit_types:error('binding_not_found')).
--type(remove_res() :: bind_or_error() | rabbit_misc:const(bind_or_error())).
-opaque(deletions() :: dict()).
--spec(recover/0 :: () -> [rabbit_types:binding()]).
+-spec(recover/2 :: ([rabbit_exchange:name()], [rabbit_amqqueue:name()]) ->
+ 'ok').
-spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()).
--spec(add/1 :: (rabbit_types:binding()) -> add_res()).
--spec(remove/1 :: (rabbit_types:binding()) -> remove_res()).
--spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> add_res()).
--spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> remove_res()).
+-spec(add/1 :: (rabbit_types:binding()) -> bind_res()).
+-spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()).
+-spec(remove/1 :: (rabbit_types:binding()) -> bind_res()).
+-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()).
-spec(list/1 :: (rabbit_types:vhost()) -> bindings()).
-spec(list_for_source/1 ::
(rabbit_types:binding_source()) -> bindings()).
@@ -93,14 +93,27 @@
destination_name, destination_kind,
routing_key, arguments]).
-recover() ->
- rabbit_misc:table_fold(
- fun (Route = #route{binding = B}, Acc) ->
- {_, ReverseRoute} = route_with_reverse(Route),
- ok = mnesia:write(rabbit_route, Route, write),
- ok = mnesia:write(rabbit_reverse_route, ReverseRoute, write),
- [B | Acc]
- end, [], rabbit_durable_route).
+recover(XNames, QNames) ->
+ XNameSet = sets:from_list(XNames),
+ QNameSet = sets:from_list(QNames),
+ rabbit_misc:table_filter(
+ fun (#route{binding = #binding{destination = Dst =
+ #resource{kind = Kind}}}) ->
+ sets:is_element(Dst, case Kind of
+ exchange -> XNameSet;
+ queue -> QNameSet
+ end)
+ end,
+ fun (R = #route{binding = B = #binding{source = Src}}, Tx) ->
+ case Tx of
+ true -> ok = sync_transient_binding(R, fun mnesia:write/3);
+ false -> ok
+ end,
+ {ok, X} = rabbit_exchange:lookup(Src),
+ rabbit_exchange:callback(X, add_binding, [Tx, X, B])
+ end,
+ rabbit_durable_route),
+ ok.
exists(Binding) ->
binding_action(
@@ -110,8 +123,6 @@ exists(Binding) ->
add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end).
-remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end).
-
add(Binding, InnerFun) ->
binding_action(
Binding,
@@ -120,51 +131,46 @@ add(Binding, InnerFun) ->
%% in general, we want to fail on that in preference to
%% anything else
case InnerFun(Src, Dst) of
- ok ->
- case mnesia:read({rabbit_route, B}) of
- [] -> ok = sync_binding(B, all_durable([Src, Dst]),
- fun mnesia:write/3),
- fun (Tx) ->
- ok = rabbit_exchange:callback(
- Src, add_binding, [Tx, Src, B]),
- rabbit_event:notify_if(
- not Tx, binding_created, info(B))
- end;
- [_] -> fun rabbit_misc:const_ok/1
- end;
- {error, _} = Err ->
- rabbit_misc:const(Err)
+ ok -> case mnesia:read({rabbit_route, B}) of
+ [] -> add(Src, Dst, B);
+ [_] -> fun rabbit_misc:const_ok/1
+ end;
+ {error, _} = Err -> rabbit_misc:const(Err)
end
end).
+add(Src, Dst, B) ->
+ Durable = all_durable([Src, Dst]),
+ case (not Durable orelse mnesia:read({rabbit_durable_route, B}) =:= []) of
+ true -> ok = sync_binding(B, Durable, fun mnesia:write/3),
+ fun (Tx) -> ok = rabbit_exchange:callback(Src, add_binding,
+ [Tx, Src, B]),
+ rabbit_event:notify_if(not Tx, binding_created,
+ info(B))
+ end;
+ false -> rabbit_misc:const({error, binding_not_found})
+ end.
+
+remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end).
+
remove(Binding, InnerFun) ->
binding_action(
Binding,
fun (Src, Dst, B) ->
- Result =
- case mnesia:match_object(rabbit_route, #route{binding = B},
- write) of
- [] ->
- {error, binding_not_found};
- [_] ->
- case InnerFun(Src, Dst) of
- ok ->
- ok = sync_binding(B, all_durable([Src, Dst]),
- fun mnesia:delete_object/3),
- {ok, maybe_auto_delete(B#binding.source,
- [B], new_deletions())};
- {error, _} = E ->
- E
- end
- end,
- case Result of
- {error, _} = Err ->
- rabbit_misc:const(Err);
- {ok, Deletions} ->
- fun (Tx) -> ok = process_deletions(Deletions, Tx) end
+ case mnesia:read(rabbit_route, B, write) of
+ [] -> rabbit_misc:const({error, binding_not_found});
+ [_] -> case InnerFun(Src, Dst) of
+ ok -> remove(Src, Dst, B);
+ {error, _} = Err -> rabbit_misc:const(Err)
+ end
end
end).
+remove(Src, Dst, B) ->
+ ok = sync_binding(B, all_durable([Src, Dst]), fun mnesia:delete_object/3),
+ Deletions = maybe_auto_delete(B#binding.source, [B], new_deletions()),
+ fun (Tx) -> ok = process_deletions(Deletions, Tx) end.
+
list(VHostPath) ->
VHostResource = rabbit_misc:r(VHostPath, '_'),
Route = #route{binding = #binding{source = VHostResource,
@@ -259,31 +265,31 @@ binding_action(Binding = #binding{source = SrcName,
Fun(Src, Dst, Binding#binding{args = SortedArgs})
end).
-sync_binding(Binding, Durable, Fun) ->
- ok = case Durable of
- true -> Fun(rabbit_durable_route,
- #route{binding = Binding}, write);
- false -> ok
- end,
+sync_binding(Binding, true, Fun) ->
+ ok = Fun(rabbit_durable_route, #route{binding = Binding}, write),
+ ok = sync_transient_binding(Binding, Fun);
+
+sync_binding(Binding, false, Fun) ->
+ ok = sync_transient_binding(Binding, Fun).
+
+sync_transient_binding(Binding, Fun) ->
{Route, ReverseRoute} = route_with_reverse(Binding),
ok = Fun(rabbit_route, Route, write),
- ok = Fun(rabbit_reverse_route, ReverseRoute, write),
- ok.
+ ok = Fun(rabbit_reverse_route, ReverseRoute, write).
call_with_source_and_destination(SrcName, DstName, Fun) ->
SrcTable = table_for_resource(SrcName),
DstTable = table_for_resource(DstName),
- ErrFun = fun (Err) -> rabbit_misc:const(Err) end,
+ ErrFun = fun (Err) -> rabbit_misc:const({error, Err}) end,
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case {mnesia:read({SrcTable, SrcName}),
mnesia:read({DstTable, DstName})} of
{[Src], [Dst]} -> Fun(Src, Dst);
- {[], [_] } -> ErrFun({error, source_not_found});
- {[_], [] } -> ErrFun({error, destination_not_found});
- {[], [] } -> ErrFun({error,
- source_and_destination_not_found})
- end
+ {[], [_] } -> ErrFun(source_not_found);
+ {[_], [] } -> ErrFun(destination_not_found);
+ {[], [] } -> ErrFun(source_and_destination_not_found)
+ end
end).
table_for_resource(#resource{kind = exchange}) -> rabbit_exchange;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 8a234c0f..bfd779ee 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -156,6 +156,7 @@ ready_for_close(Pid) ->
init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
Capabilities, CollectorPid, StartLimiterFun]) ->
+ process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
StatsTimer = rabbit_event:init_stats_timer(),
State = #ch{state = starting,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 9d9b07af..42111773 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -36,7 +36,7 @@
-type(type() :: atom()).
-type(fun_name() :: atom()).
--spec(recover/0 :: () -> 'ok').
+-spec(recover/0 :: () -> [name()]).
-spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok').
-spec(declare/6 ::
(name(), type(), boolean(), boolean(), boolean(),
@@ -83,25 +83,19 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]).
recover() ->
- Xs = rabbit_misc:table_fold(
- fun (X, Acc) ->
- ok = mnesia:write(rabbit_exchange, X, write),
- [X | Acc]
- end, [], rabbit_durable_exchange),
- Bs = rabbit_binding:recover(),
- recover_with_bindings(
- lists:keysort(#binding.source, Bs),
- lists:keysort(#exchange.name, Xs), []).
-
-recover_with_bindings([B = #binding{source = XName} | Rest],
- Xs = [#exchange{name = XName} | _],
- Bindings) ->
- recover_with_bindings(Rest, Xs, [B | Bindings]);
-recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) ->
- (type_to_module(Type)):recover(X, Bindings),
- recover_with_bindings(Bs, Xs, []);
-recover_with_bindings([], [], []) ->
- ok.
+ Xs = rabbit_misc:table_filter(
+ fun (#exchange{name = XName}) ->
+ mnesia:read({rabbit_exchange, XName}) =:= []
+ end,
+ fun (X, Tx) ->
+ case Tx of
+ true -> ok = mnesia:write(rabbit_exchange, X, write);
+ false -> ok
+ end,
+ rabbit_exchange:callback(X, create, [Tx, X])
+ end,
+ rabbit_durable_exchange),
+ [XName || #exchange{name = XName} <- Xs].
callback(#exchange{type = XType}, Fun, Args) ->
apply(type_to_module(XType), Fun, Args).
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 547583e9..cd96407c 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -26,16 +26,13 @@ behaviour_info(callbacks) ->
%% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
{validate, 1},
- %% called after declaration when previously absent
+ %% called after declaration and recovery
{create, 2},
- %% called when recovering
- {recover, 2},
-
- %% called after exchange deletion.
+ %% called after exchange (auto)deletion.
{delete, 3},
- %% called after a binding has been added
+ %% called after a binding has been added or recovered
{add_binding, 3},
%% called after bindings have been deleted.
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 349c2f6e..40078b1a 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/2, recover/2, delete/3,
+-export([validate/1, create/2, delete/3,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -41,7 +41,6 @@ route(#exchange{name = Name},
validate(_X) -> ok.
create(_Tx, _X) -> ok.
-recover(_X, _Bs) -> ok.
delete(_Tx, _X, _Bs) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index bc5293c8..f32ef917 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/2, recover/2, delete/3, add_binding/3,
+-export([validate/1, create/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -40,7 +40,6 @@ route(#exchange{name = Name}, _Delivery) ->
validate(_X) -> ok.
create(_Tx, _X) -> ok.
-recover(_X, _Bs) -> ok.
delete(_Tx, _X, _Bs) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index d3529b06..139feb04 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -21,7 +21,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/2, recover/2, delete/3, add_binding/3,
+-export([validate/1, create/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -114,7 +114,6 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
validate(_X) -> ok.
create(_Tx, _X) -> ok.
-recover(_X, _Bs) -> ok.
delete(_Tx, _X, _Bs) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index c192f8cf..74c566b8 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -21,7 +21,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/2, recover/2, delete/3, add_binding/3,
+-export([validate/1, create/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -49,12 +49,6 @@ route(#exchange{name = X},
validate(_X) -> ok.
create(_Tx, _X) -> ok.
-recover(_Exchange, Bs) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- lists:foreach(fun (B) -> internal_add_binding(B) end, Bs)
- end).
-
delete(true, #exchange{name = X}, _Bs) ->
trie_remove_all_edges(X),
trie_remove_all_bindings(X),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 1daeeb2a..cec10ff6 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -38,7 +38,7 @@
-export([ensure_ok/2]).
-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
-export([upmap/2, map_in_order/2]).
--export([table_fold/3]).
+-export([table_filter/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([read_term_file/1, write_term_file/2]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
@@ -145,7 +145,8 @@
-> atom()).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
--spec(table_fold/3 :: (fun ((any(), A) -> A), A, atom()) -> A).
+-spec(table_filter/3:: (fun ((A) -> boolean()), fun ((A, boolean()) -> 'ok'),
+ atom()) -> [A]).
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom())
-> 'ok' | 'aborted').
@@ -459,20 +460,23 @@ map_in_order(F, L) ->
lists:reverse(
lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)).
-%% Fold over each entry in a table, executing the cons function in a
-%% transaction. This is often far more efficient than wrapping a tx
-%% around the lot.
+%% Apply a pre-post-commit function to all entries in a table that
+%% satisfy a predicate, and return those entries.
%%
%% We ignore entries that have been modified or removed.
-table_fold(F, Acc0, TableName) ->
+table_filter(Pred, PrePostCommitFun, TableName) ->
lists:foldl(
- fun (E, Acc) -> execute_mnesia_transaction(
- fun () -> case mnesia:match_object(TableName, E, read) of
- [] -> Acc;
- _ -> F(E, Acc)
- end
- end)
- end, Acc0, dirty_read_all(TableName)).
+ fun (E, Acc) ->
+ case execute_mnesia_transaction(
+ fun () -> mnesia:match_object(TableName, E, read) =/= []
+ andalso Pred(E) end,
+ fun (false, _Tx) -> false;
+ (true, Tx) -> PrePostCommitFun(E, Tx), true
+ end) of
+ false -> Acc;
+ true -> [E | Acc]
+ end
+ end, [], dirty_read_all(TableName)).
dirty_read_all(TableName) ->
mnesia:dirty_select(TableName, [{'$1',[],['$1']}]).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 65688142..3f4162cd 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1891,7 +1891,10 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
force_recovery(BaseDir, Store) ->
Dir = filename:join(BaseDir, atom_to_list(Store)),
- ok = file:delete(filename:join(Dir, ?CLEAN_FILENAME)),
+ case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
+ ok -> ok;
+ {error, enoent} -> ok
+ end,
recover_crashed_compactions(BaseDir),
ok.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 609bb43f..42af91a8 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -681,7 +681,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
State#v1{connection_state = running,
connection = NewConnection}),
rabbit_event:notify(connection_created,
- infos(?CREATION_EVENT_KEYS, State1)),
+ [{type, network} |
+ infos(?CREATION_EVENT_KEYS, State1)]),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State1) end),
State1;
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index a3cd2b37..e0defa9e 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -211,7 +211,8 @@ format_asn1_value(V) ->
%% subset of ASCII it is also a subset of UTF-8. The others need
%% converting. Fortunately since the Erlang SSL library does the
%% decoding for us (albeit into a weird format, see below), we just
-%% need to handle encoding into UTF-8.
+%% need to handle encoding into UTF-8. Note also that utf8Strings come
+%% back as binary.
%%
%% Note for testing: the default Ubuntu configuration for openssl will
%% only create printableString or teletexString types no matter what
@@ -225,7 +226,7 @@ format_directory_string(printableString, S) -> S;
format_directory_string(teletexString, S) -> utf8_list_from(S);
format_directory_string(bmpString, S) -> utf8_list_from(S);
format_directory_string(universalString, S) -> utf8_list_from(S);
-format_directory_string(utf8String, S) -> S.
+format_directory_string(utf8String, S) -> binary_to_list(S).
utf8_list_from(S) ->
binary_to_list(
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 294fae97..38492984 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2320,7 +2320,7 @@ test_queue_recover() ->
after 10000 -> exit(timeout_waiting_for_queue_death)
end,
rabbit_amqqueue:stop(),
- ok = rabbit_amqqueue:start(),
+ rabbit_amqqueue:start(),
rabbit_amqqueue:with_or_die(
QName,
fun (Q1 = #amqqueue { pid = QPid1 }) ->