diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-10-18 10:59:35 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-10-18 10:59:35 +0100 |
commit | b045fefb25ab3d7330ed8b35f072c9a1f0d6a22b (patch) | |
tree | 20768ebd68ec1280ad4fb6f495e5a08866dcf50c | |
parent | 3b2fb31d009e56c7a57a7d3e7a78cc27ebb6ff0c (diff) | |
parent | 133cde4888174bc7d9a294cdf1895ae4ed274708 (diff) | |
download | rabbitmq-server-b045fefb25ab3d7330ed8b35f072c9a1f0d6a22b.tar.gz |
Merged bug23405
-rw-r--r-- | src/rabbit_binding.erl | 8 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 109 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 28 |
3 files changed, 77 insertions, 68 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 53c9c663..1af213c4 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -426,10 +426,14 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> process_deletions(Deletions) -> dict:fold( fun (_XName, {X = #exchange{ type = Type }, Deleted, Bindings}, ok) -> - TypeModule = type_to_module(Type), FlatBindings = lists:flatten(Bindings), + [rabbit_event:notify(binding_deleted, info(B)) || + B <- FlatBindings], + TypeModule = type_to_module(Type), case Deleted of not_deleted -> TypeModule:remove_bindings(X, FlatBindings); - deleted -> TypeModule:delete(X, FlatBindings) + deleted -> rabbit_event:notify(exchange_deleted, + [{name, X#exchange.name}]), + TypeModule:delete(X, FlatBindings) end end, ok, Deletions). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 81d3c501..66cc06cf 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -33,9 +33,9 @@ -behaviour(gen_server2). --export([start_link/4, write/4, read/3, contains/2, remove/2, release/2, - sync/3, client_init/2, client_terminate/2, - client_delete_and_terminate/3, successfully_recovered_state/1]). +-export([start_link/4, successfully_recovered_state/1, + client_init/2, client_terminate/2, client_delete_and_terminate/3, + write/4, read/3, contains/2, remove/2, release/2, sync/3]). -export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal @@ -123,6 +123,11 @@ -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', startup_fun_state()) -> rabbit_types:ok_pid_or_error()). +-spec(successfully_recovered_state/1 :: (server()) -> boolean()). +-spec(client_init/2 :: (server(), binary()) -> client_msstate()). +-spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). +-spec(client_delete_and_terminate/3 :: + (client_msstate(), server(), binary()) -> 'ok'). -spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) -> rabbit_types:ok(client_msstate())). -spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) -> @@ -131,15 +136,11 @@ -spec(remove/2 :: (server(), [rabbit_guid:guid()]) -> 'ok'). -spec(release/2 :: (server(), [rabbit_guid:guid()]) -> 'ok'). -spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok'). + +-spec(sync/1 :: (server()) -> 'ok'). -spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). --spec(client_init/2 :: (server(), binary()) -> client_msstate()). --spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). --spec(client_delete_and_terminate/3 :: - (client_msstate(), server(), binary()) -> 'ok'). --spec(successfully_recovered_state/1 :: (server()) -> boolean()). - -spec(gc/3 :: (non_neg_integer(), non_neg_integer(), {ets:tid(), file:filename(), atom(), any()}) -> 'concurrent_readers' | non_neg_integer()). @@ -308,6 +309,31 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> [Server, Dir, ClientRefs, StartupFunState], [{timeout, infinity}]). +successfully_recovered_state(Server) -> + gen_server2:call(Server, successfully_recovered_state, infinity). + +client_init(Server, Ref) -> + {IState, IModule, Dir, GCPid, + FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = + gen_server2:call(Server, {new_client_state, Ref}, infinity), + #client_msstate { file_handle_cache = dict:new(), + index_state = IState, + index_module = IModule, + dir = Dir, + gc_pid = GCPid, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts }. + +client_terminate(CState, Server) -> + close_all_handles(CState), + ok = gen_server2:call(Server, client_terminate, infinity). + +client_delete_and_terminate(CState, Server, Ref) -> + close_all_handles(CState), + ok = gen_server2:cast(Server, {client_delete, Ref}). + write(Server, Guid, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), @@ -345,7 +371,9 @@ remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}). release(_Server, []) -> ok; release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}). sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}). -sync(Server) -> gen_server2:cast(Server, sync). %% internal + +sync(Server) -> + gen_server2:cast(Server, sync). gc_done(Server, Reclaimed, Source, Destination) -> gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}). @@ -353,31 +381,6 @@ gc_done(Server, Reclaimed, Source, Destination) -> set_maximum_since_use(Server, Age) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). -client_init(Server, Ref) -> - {IState, IModule, Dir, GCPid, - FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = - gen_server2:call(Server, {new_client_state, Ref}, infinity), - #client_msstate { file_handle_cache = dict:new(), - index_state = IState, - index_module = IModule, - dir = Dir, - gc_pid = GCPid, - file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts }. - -client_terminate(CState, Server) -> - close_all_handles(CState), - ok = gen_server2:call(Server, client_terminate, infinity). - -client_delete_and_terminate(CState, Server, Ref) -> - close_all_handles(CState), - ok = gen_server2:cast(Server, {client_delete, Ref}). - -successfully_recovered_state(Server) -> - gen_server2:call(Server, successfully_recovered_state, infinity). - %%---------------------------------------------------------------------------- %% Client-side-only helpers %%---------------------------------------------------------------------------- @@ -577,8 +580,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> prioritise_call(Msg, _From, _State) -> case Msg of - {new_client_state, _Ref} -> 7; successfully_recovered_state -> 7; + {new_client_state, _Ref} -> 7; {read, _Guid} -> 2; _ -> 0 end. @@ -591,13 +594,8 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. -handle_call({read, Guid}, From, State) -> - State1 = read_message(Guid, From, State), - noreply(State1); - -handle_call({contains, Guid}, From, State) -> - State1 = contains_message(Guid, From, State), - noreply(State1); +handle_call(successfully_recovered_state, _From, State) -> + reply(State #msstate.successfully_recovered, State); handle_call({new_client_state, CRef}, _From, State = #msstate { dir = Dir, @@ -613,11 +611,21 @@ handle_call({new_client_state, CRef}, _From, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts}, State #msstate { client_refs = sets:add_element(CRef, ClientRefs) }); -handle_call(successfully_recovered_state, _From, State) -> - reply(State #msstate.successfully_recovered, State); - handle_call(client_terminate, _From, State) -> - reply(ok, State). + reply(ok, State); + +handle_call({read, Guid}, From, State) -> + State1 = read_message(Guid, From, State), + noreply(State1); + +handle_call({contains, Guid}, From, State) -> + State1 = contains_message(Guid, From, State), + noreply(State1). + +handle_cast({client_delete, CRef}, + State = #msstate { client_refs = ClientRefs }) -> + noreply( + State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }); handle_cast({write, Guid}, State = #msstate { sum_valid_data = SumValid, @@ -712,12 +720,7 @@ handle_cast({gc_done, Reclaimed, Src, Dst}, handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), - noreply(State); - -handle_cast({client_delete, CRef}, - State = #msstate { client_refs = ClientRefs }) -> - noreply( - State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }). + noreply(State). handle_info(timeout, State) -> noreply(internal_sync(State)); diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e500b111..29004bd5 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -815,17 +815,13 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, #v1{}) -> self(); i(address, #v1{sock = Sock}) -> - {ok, {A, _}} = rabbit_net:sockname(Sock), - A; + socket_info(fun rabbit_net:sockname/1, fun ({A, _}) -> A end, Sock); i(port, #v1{sock = Sock}) -> - {ok, {_, P}} = rabbit_net:sockname(Sock), - P; + socket_info(fun rabbit_net:sockname/1, fun ({_, P}) -> P end, Sock); i(peer_address, #v1{sock = Sock}) -> - {ok, {A, _}} = rabbit_net:peername(Sock), - A; + socket_info(fun rabbit_net:peername/1, fun ({A, _}) -> A end, Sock); i(peer_port, #v1{sock = Sock}) -> - {ok, {_, P}} = rabbit_net:peername(Sock), - P; + socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock); i(peer_cert_issuer, #v1{sock = Sock}) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock); i(peer_cert_subject, #v1{sock = Sock}) -> @@ -837,11 +833,8 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; SockStat =:= send_oct; SockStat =:= send_cnt; SockStat =:= send_pend -> - case rabbit_net:getstat(Sock, [SockStat]) of - {ok, [{SockStat, StatVal}]} -> StatVal; - {error, einval} -> undefined; - {error, Error} -> throw({cannot_get_socket_stats, Error}) - end; + socket_info(fun () -> rabbit_net:getstat(Sock, [SockStat]) end, + fun ([{_, I}]) -> I end); i(state, #v1{connection_state = S}) -> S; i(channels, #v1{}) -> @@ -866,6 +859,15 @@ i(client_properties, #v1{connection = #connection{ i(Item, #v1{}) -> throw({bad_argument, Item}). +socket_info(Get, Select, Sock) -> + socket_info(fun() -> Get(Sock) end, Select). + +socket_info(Get, Select) -> + case Get() of + {ok, T} -> Select(T); + {error, _} -> '' + end. + cert_info(F, Sock) -> case rabbit_net:peercert(Sock) of nossl -> ''; |