summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-10-18 10:59:35 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-10-18 10:59:35 +0100
commitb045fefb25ab3d7330ed8b35f072c9a1f0d6a22b (patch)
tree20768ebd68ec1280ad4fb6f495e5a08866dcf50c
parent3b2fb31d009e56c7a57a7d3e7a78cc27ebb6ff0c (diff)
parent133cde4888174bc7d9a294cdf1895ae4ed274708 (diff)
downloadrabbitmq-server-b045fefb25ab3d7330ed8b35f072c9a1f0d6a22b.tar.gz
Merged bug23405
-rw-r--r--src/rabbit_binding.erl8
-rw-r--r--src/rabbit_msg_store.erl109
-rw-r--r--src/rabbit_reader.erl28
3 files changed, 77 insertions, 68 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 53c9c663..1af213c4 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -426,10 +426,14 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
process_deletions(Deletions) ->
dict:fold(
fun (_XName, {X = #exchange{ type = Type }, Deleted, Bindings}, ok) ->
- TypeModule = type_to_module(Type),
FlatBindings = lists:flatten(Bindings),
+ [rabbit_event:notify(binding_deleted, info(B)) ||
+ B <- FlatBindings],
+ TypeModule = type_to_module(Type),
case Deleted of
not_deleted -> TypeModule:remove_bindings(X, FlatBindings);
- deleted -> TypeModule:delete(X, FlatBindings)
+ deleted -> rabbit_event:notify(exchange_deleted,
+ [{name, X#exchange.name}]),
+ TypeModule:delete(X, FlatBindings)
end
end, ok, Deletions).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 81d3c501..66cc06cf 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -33,9 +33,9 @@
-behaviour(gen_server2).
--export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
- sync/3, client_init/2, client_terminate/2,
- client_delete_and_terminate/3, successfully_recovered_state/1]).
+-export([start_link/4, successfully_recovered_state/1,
+ client_init/2, client_terminate/2, client_delete_and_terminate/3,
+ write/4, read/3, contains/2, remove/2, release/2, sync/3]).
-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
@@ -123,6 +123,11 @@
-spec(start_link/4 ::
(atom(), file:filename(), [binary()] | 'undefined',
startup_fun_state()) -> rabbit_types:ok_pid_or_error()).
+-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
+-spec(client_init/2 :: (server(), binary()) -> client_msstate()).
+-spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok').
+-spec(client_delete_and_terminate/3 ::
+ (client_msstate(), server(), binary()) -> 'ok').
-spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) ->
rabbit_types:ok(client_msstate())).
-spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) ->
@@ -131,15 +136,11 @@
-spec(remove/2 :: (server(), [rabbit_guid:guid()]) -> 'ok').
-spec(release/2 :: (server(), [rabbit_guid:guid()]) -> 'ok').
-spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok').
+
+-spec(sync/1 :: (server()) -> 'ok').
-spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) ->
'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
--spec(client_init/2 :: (server(), binary()) -> client_msstate()).
--spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok').
--spec(client_delete_and_terminate/3 ::
- (client_msstate(), server(), binary()) -> 'ok').
--spec(successfully_recovered_state/1 :: (server()) -> boolean()).
-
-spec(gc/3 :: (non_neg_integer(), non_neg_integer(),
{ets:tid(), file:filename(), atom(), any()}) ->
'concurrent_readers' | non_neg_integer()).
@@ -308,6 +309,31 @@ start_link(Server, Dir, ClientRefs, StartupFunState) ->
[Server, Dir, ClientRefs, StartupFunState],
[{timeout, infinity}]).
+successfully_recovered_state(Server) ->
+ gen_server2:call(Server, successfully_recovered_state, infinity).
+
+client_init(Server, Ref) ->
+ {IState, IModule, Dir, GCPid,
+ FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
+ gen_server2:call(Server, {new_client_state, Ref}, infinity),
+ #client_msstate { file_handle_cache = dict:new(),
+ index_state = IState,
+ index_module = IModule,
+ dir = Dir,
+ gc_pid = GCPid,
+ file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts,
+ cur_file_cache_ets = CurFileCacheEts }.
+
+client_terminate(CState, Server) ->
+ close_all_handles(CState),
+ ok = gen_server2:call(Server, client_terminate, infinity).
+
+client_delete_and_terminate(CState, Server, Ref) ->
+ close_all_handles(CState),
+ ok = gen_server2:cast(Server, {client_delete, Ref}).
+
write(Server, Guid, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
@@ -345,7 +371,9 @@ remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}).
release(_Server, []) -> ok;
release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}).
sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}).
-sync(Server) -> gen_server2:cast(Server, sync). %% internal
+
+sync(Server) ->
+ gen_server2:cast(Server, sync).
gc_done(Server, Reclaimed, Source, Destination) ->
gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}).
@@ -353,31 +381,6 @@ gc_done(Server, Reclaimed, Source, Destination) ->
set_maximum_since_use(Server, Age) ->
gen_server2:cast(Server, {set_maximum_since_use, Age}).
-client_init(Server, Ref) ->
- {IState, IModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
- gen_server2:call(Server, {new_client_state, Ref}, infinity),
- #client_msstate { file_handle_cache = dict:new(),
- index_state = IState,
- index_module = IModule,
- dir = Dir,
- gc_pid = GCPid,
- file_handles_ets = FileHandlesEts,
- file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts }.
-
-client_terminate(CState, Server) ->
- close_all_handles(CState),
- ok = gen_server2:call(Server, client_terminate, infinity).
-
-client_delete_and_terminate(CState, Server, Ref) ->
- close_all_handles(CState),
- ok = gen_server2:cast(Server, {client_delete, Ref}).
-
-successfully_recovered_state(Server) ->
- gen_server2:call(Server, successfully_recovered_state, infinity).
-
%%----------------------------------------------------------------------------
%% Client-side-only helpers
%%----------------------------------------------------------------------------
@@ -577,8 +580,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- {new_client_state, _Ref} -> 7;
successfully_recovered_state -> 7;
+ {new_client_state, _Ref} -> 7;
{read, _Guid} -> 2;
_ -> 0
end.
@@ -591,13 +594,8 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
-handle_call({read, Guid}, From, State) ->
- State1 = read_message(Guid, From, State),
- noreply(State1);
-
-handle_call({contains, Guid}, From, State) ->
- State1 = contains_message(Guid, From, State),
- noreply(State1);
+handle_call(successfully_recovered_state, _From, State) ->
+ reply(State #msstate.successfully_recovered, State);
handle_call({new_client_state, CRef}, _From,
State = #msstate { dir = Dir,
@@ -613,11 +611,21 @@ handle_call({new_client_state, CRef}, _From,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
State #msstate { client_refs = sets:add_element(CRef, ClientRefs) });
-handle_call(successfully_recovered_state, _From, State) ->
- reply(State #msstate.successfully_recovered, State);
-
handle_call(client_terminate, _From, State) ->
- reply(ok, State).
+ reply(ok, State);
+
+handle_call({read, Guid}, From, State) ->
+ State1 = read_message(Guid, From, State),
+ noreply(State1);
+
+handle_call({contains, Guid}, From, State) ->
+ State1 = contains_message(Guid, From, State),
+ noreply(State1).
+
+handle_cast({client_delete, CRef},
+ State = #msstate { client_refs = ClientRefs }) ->
+ noreply(
+ State #msstate { client_refs = sets:del_element(CRef, ClientRefs) });
handle_cast({write, Guid},
State = #msstate { sum_valid_data = SumValid,
@@ -712,12 +720,7 @@ handle_cast({gc_done, Reclaimed, Src, Dst},
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
- noreply(State);
-
-handle_cast({client_delete, CRef},
- State = #msstate { client_refs = ClientRefs }) ->
- noreply(
- State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
+ noreply(State).
handle_info(timeout, State) ->
noreply(internal_sync(State));
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e500b111..29004bd5 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -815,17 +815,13 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, #v1{}) ->
self();
i(address, #v1{sock = Sock}) ->
- {ok, {A, _}} = rabbit_net:sockname(Sock),
- A;
+ socket_info(fun rabbit_net:sockname/1, fun ({A, _}) -> A end, Sock);
i(port, #v1{sock = Sock}) ->
- {ok, {_, P}} = rabbit_net:sockname(Sock),
- P;
+ socket_info(fun rabbit_net:sockname/1, fun ({_, P}) -> P end, Sock);
i(peer_address, #v1{sock = Sock}) ->
- {ok, {A, _}} = rabbit_net:peername(Sock),
- A;
+ socket_info(fun rabbit_net:peername/1, fun ({A, _}) -> A end, Sock);
i(peer_port, #v1{sock = Sock}) ->
- {ok, {_, P}} = rabbit_net:peername(Sock),
- P;
+ socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock);
i(peer_cert_issuer, #v1{sock = Sock}) ->
cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock);
i(peer_cert_subject, #v1{sock = Sock}) ->
@@ -837,11 +833,8 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
SockStat =:= send_oct;
SockStat =:= send_cnt;
SockStat =:= send_pend ->
- case rabbit_net:getstat(Sock, [SockStat]) of
- {ok, [{SockStat, StatVal}]} -> StatVal;
- {error, einval} -> undefined;
- {error, Error} -> throw({cannot_get_socket_stats, Error})
- end;
+ socket_info(fun () -> rabbit_net:getstat(Sock, [SockStat]) end,
+ fun ([{_, I}]) -> I end);
i(state, #v1{connection_state = S}) ->
S;
i(channels, #v1{}) ->
@@ -866,6 +859,15 @@ i(client_properties, #v1{connection = #connection{
i(Item, #v1{}) ->
throw({bad_argument, Item}).
+socket_info(Get, Select, Sock) ->
+ socket_info(fun() -> Get(Sock) end, Select).
+
+socket_info(Get, Select) ->
+ case Get() of
+ {ok, T} -> Select(T);
+ {error, _} -> ''
+ end.
+
cert_info(F, Sock) ->
case rabbit_net:peercert(Sock) of
nossl -> '';