diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-04-05 14:09:54 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-04-05 14:09:54 +0100 |
commit | 0bad583b1b18d6c8be7eec03e1ad8e9add149214 (patch) | |
tree | c6893a20e2c678d80a85420f17c2b7e19a655835 | |
parent | 9298d8a1dd79f862964a1d101908562b49fc91f6 (diff) | |
parent | f4b10422c87885a746ac509c68a677f18bc60296 (diff) | |
download | rabbitmq-server-0bad583b1b18d6c8be7eec03e1ad8e9add149214.tar.gz |
merge heads
-rwxr-xr-x | packaging/common/rabbitmq-server.ocf | 8 | ||||
-rwxr-xr-x | scripts/rabbitmq-env | 3 | ||||
-rw-r--r-- | src/gm.erl | 11 | ||||
-rw-r--r-- | src/gm_soak_test.erl | 7 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 14 | ||||
-rw-r--r-- | src/rabbit_control.erl | 17 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 8 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 10 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 146 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 6 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 11 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 7 | ||||
-rw-r--r-- | src/supervisor2.erl | 25 | ||||
-rw-r--r-- | src/test_sup.erl | 2 |
14 files changed, 102 insertions, 173 deletions
diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf index 94999d0e..d58c48ed 100755 --- a/packaging/common/rabbitmq-server.ocf +++ b/packaging/common/rabbitmq-server.ocf @@ -103,9 +103,9 @@ The IP Port for rabbitmq-server to listen on <parameter name="config_file" unique="0" required="0"> <longdesc lang="en"> -Location of the config file +Location of the config file (without the .config suffix) </longdesc> -<shortdesc lang="en">Config file path</shortdesc> +<shortdesc lang="en">Config file path (without the .config suffix)</shortdesc> <content type="string" default="" /> </parameter> @@ -189,8 +189,8 @@ rabbit_validate_partial() { } rabbit_validate_full() { - if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e $RABBITMQ_CONFIG_FILE ]; then - ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file"; + if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e "${RABBITMQ_CONFIG_FILE}.config" ]; then + ocf_log err "rabbitmq-server config_file ${RABBITMQ_CONFIG_FILE}.config does not exist or is not a file"; exit $OCF_ERR_INSTALLED; fi diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index 3e173949..a2ef8d3c 100755 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -37,7 +37,8 @@ RABBITMQ_HOME="${SCRIPT_DIR}/.." NODENAME=rabbit@${HOSTNAME%%.*} # Load configuration from the rabbitmq.conf file -if [ -f /etc/rabbitmq/rabbitmq.conf ]; then +if [ -f /etc/rabbitmq/rabbitmq.conf ] && \ + [ ! -f /etc/rabbitmq/rabbitmq-env.conf ] ; then echo -n "WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- " echo "location has moved to /etc/rabbitmq/rabbitmq-env.conf" fi @@ -516,7 +516,8 @@ flush(Server) -> init([GroupName, Module, Args]) -> - random:seed(now()), + {MegaSecs, Secs, MicroSecs} = now(), + random:seed(MegaSecs, Secs, MicroSecs), gen_server2:cast(self(), join), Self = self(), {ok, #state { self = Self, @@ -1010,7 +1011,7 @@ prune_or_create_group(Self, GroupName) -> fun () -> GroupNew = #gm_group { name = GroupName, members = [Self], version = 0 }, - case mnesia:read(?GROUP_TABLE, GroupName) of + case mnesia:read({?GROUP_TABLE, GroupName}) of [] -> mnesia:write(GroupNew), GroupNew; @@ -1028,7 +1029,7 @@ record_dead_member_in_group(Member, GroupName) -> {atomic, Group} = mnesia:sync_transaction( fun () -> [Group1 = #gm_group { members = Members, version = Ver }] = - mnesia:read(?GROUP_TABLE, GroupName), + mnesia:read({?GROUP_TABLE, GroupName}), case lists:splitwith( fun (Member1) -> Member1 =/= Member end, Members) of {_Members1, []} -> %% not found - already recorded dead @@ -1048,7 +1049,7 @@ record_new_member_in_group(GroupName, Left, NewMember, Fun) -> mnesia:sync_transaction( fun () -> [#gm_group { members = Members, version = Ver } = Group1] = - mnesia:read(?GROUP_TABLE, GroupName), + mnesia:read({?GROUP_TABLE, GroupName}), {Prefix, [Left | Suffix]} = lists:splitwith(fun (M) -> M =/= Left end, Members), Members1 = Prefix ++ [Left, NewMember | Suffix], @@ -1067,7 +1068,7 @@ erase_members_in_group(Members, GroupName) -> fun () -> [Group1 = #gm_group { members = [_|_] = Members1, version = Ver }] = - mnesia:read(?GROUP_TABLE, GroupName), + mnesia:read({?GROUP_TABLE, GroupName}), case Members1 -- DeadMembers of Members1 -> Group1; Members2 -> Group2 = diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl index 1f8832a6..dae42ac7 100644 --- a/src/gm_soak_test.erl +++ b/src/gm_soak_test.erl @@ -35,7 +35,7 @@ with_state(Fun) -> inc() -> case 1 + get(count) of - 100000 -> Now = os:timestamp(), + 100000 -> Now = now(), Start = put(ts, Now), Diff = timer:now_diff(Now, Start), Rate = 100000 / (Diff / 1000000), @@ -48,7 +48,7 @@ joined([], Members) -> io:format("Joined ~p (~p members)~n", [self(), length(Members)]), put(state, dict:from_list([{Member, empty} || Member <- Members])), put(count, 0), - put(ts, os:timestamp()), + put(ts, now()), ok. members_changed([], Births, Deaths) -> @@ -101,7 +101,8 @@ terminate([], Reason) -> spawn_member() -> spawn_link( fun () -> - random:seed(now()), + {MegaSecs, Secs, MicroSecs} = now(), + random:seed(MegaSecs, Secs, MicroSecs), %% start up delay of no more than 10 seconds timer:sleep(random:uniform(10000)), {ok, Pid} = gm:start_link(?MODULE, ?MODULE, []), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3f5758ce..2b0fe17e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -428,11 +428,19 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> {CMs, MTC0} end end, {gb_trees:empty(), MTC}, MsgIds), - gb_trees:map(fun(ChPid, MsgSeqNos) -> - rabbit_channel:confirm(ChPid, MsgSeqNos) - end, CMs), + gb_trees_foreach(fun(ChPid, MsgSeqNos) -> + rabbit_channel:confirm(ChPid, MsgSeqNos) + end, CMs), State#q{msg_id_to_channel = MTC1}. +gb_trees_foreach(_, none) -> + ok; +gb_trees_foreach(Fun, {Key, Val, It}) -> + Fun(Key, Val), + gb_trees_foreach(Fun, gb_trees:next(It)); +gb_trees_foreach(Fun, Tree) -> + gb_trees_foreach(Fun, gb_trees:next(gb_trees:iterator(Tree))). + gb_trees_cons(Key, Value, Tree) -> case gb_trees:lookup(Key, Tree) of {value, Values} -> gb_trees:update(Key, [Value | Values], Tree); diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 571eb5e4..1af91f4c 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -127,6 +127,8 @@ usage() -> io:format("~s", [rabbit_ctl_usage:usage()]), quit(1). +%%---------------------------------------------------------------------------- + action(stop, Node, [], _Opts, Inform) -> Inform("Stopping and halting node ~p", [Node]), call(Node, {rabbit, stop_and_halt, []}); @@ -159,6 +161,10 @@ action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) -> [Node, ClusterNodes]), rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]); +action(wait, Node, [], _Opts, Inform) -> + Inform("Waiting for ~p", [Node]), + wait_for_application(Node, ?WAIT_FOR_VM_ATTEMPTS); + action(status, Node, [], _Opts, Inform) -> Inform("Status of node ~p", [Node]), case call(Node, {rabbit, status, []}) of @@ -292,18 +298,15 @@ action(list_permissions, Node, [], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Listing permissions in vhost ~p", [VHost]), display_list(call(Node, {rabbit_auth_backend_internal, - list_vhost_permissions, [VHost]})); + list_vhost_permissions, [VHost]})). -action(wait, Node, [], _Opts, Inform) -> - Inform("Waiting for ~p", [Node]), - wait_for_application(Node, ?WAIT_FOR_VM_ATTEMPTS). +%%---------------------------------------------------------------------------- wait_for_application(Node, Attempts) -> case rpc_call(Node, application, which_applications, [infinity]) of - {badrpc, _} = E -> NewAttempts = Attempts - 1, - case NewAttempts of + {badrpc, _} = E -> case Attempts of 0 -> E; - _ -> wait_for_application0(Node, NewAttempts) + _ -> wait_for_application0(Node, Attempts - 1) end; Apps -> case proplists:is_defined(rabbit, Apps) of %% We've seen the node up; if it goes down diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index ffd1e583..c192f8cf 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -188,10 +188,10 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) -> end. trie_child(X, Node, Word) -> - case mnesia:read(rabbit_topic_trie_edge, - #trie_edge{exchange_name = X, - node_id = Node, - word = Word}) of + case mnesia:read({rabbit_topic_trie_edge, + #trie_edge{exchange_name = X, + node_id = Node, + word = Word}}) of [#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode}; [] -> error end. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 2e9563cf..1daeeb2a 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -48,8 +48,7 @@ -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). --export([recursive_delete/1, recursive_copy/2, dict_cons/3, orddict_cons/3, - unlink_and_capture_exit/1]). +-export([recursive_delete/1, recursive_copy/2, dict_cons/3, orddict_cons/3]). -export([get_options/2]). -export([all_module_attributes/1, build_acyclic_graph/3]). -export([now_ms/0]). @@ -178,7 +177,6 @@ -> rabbit_types:ok_or_error({file:filename(), file:filename(), any()})). -spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). -spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()). --spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok'). -spec(get_options/2 :: ([optdef()], [string()]) -> {[string()], [{string(), any()}]}). -spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]). @@ -749,12 +747,6 @@ dict_cons(Key, Value, Dict) -> orddict_cons(Key, Value, Dict) -> orddict:update(Key, fun (List) -> [Value | List] end, [Value], Dict). -unlink_and_capture_exit(Pid) -> - unlink(Pid), - receive {'EXIT', Pid, _} -> ok - after 0 -> ok - end. - %% Separate flags and options from arguments. %% get_options([{flag, "-q"}, {option, "-p", "/"}], %% ["set_permissions","-p","/","guest", diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index bb26de64..65688142 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -21,7 +21,7 @@ -export([start_link/4, successfully_recovered_state/1, client_init/4, client_terminate/1, client_delete_and_terminate/1, client_ref/1, close_all_indicated/1, - write/3, read/2, contains/2, remove/2, release/2, sync/3]). + write/3, read/2, contains/2, remove/2, sync/3]). -export([sync/1, set_maximum_since_use/2, has_readers/2, combine_files/3, delete_file/2]). %% internal @@ -67,7 +67,6 @@ gc_pid, %% pid of our GC file_handles_ets, %% tid of the shared file handles table file_summary_ets, %% tid of the file summary table - dedup_cache_ets, %% tid of dedup cache table cur_file_cache_ets, %% tid of current file cache table dying_clients, %% set of dying clients clients, %% map of references of all registered clients @@ -87,7 +86,6 @@ gc_pid, file_handles_ets, file_summary_ets, - dedup_cache_ets, cur_file_cache_ets }). @@ -130,7 +128,6 @@ gc_pid :: pid(), file_handles_ets :: ets:tid(), file_summary_ets :: ets:tid(), - dedup_cache_ets :: ets:tid(), cur_file_cache_ets :: ets:tid()}). -type(msg_ref_delta_gen(A) :: fun ((A) -> 'finished' | @@ -153,7 +150,6 @@ {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). -spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()). -spec(remove/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok'). --spec(release/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok'). -spec(sync/3 :: ([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok'). @@ -396,7 +392,7 @@ successfully_recovered_state(Server) -> client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> {IState, IModule, Dir, GCPid, - FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = + FileHandlesEts, FileSummaryEts, CurFileCacheEts} = gen_server2:call( Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity), #client_msstate { server = Server, @@ -408,7 +404,6 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> gc_pid = GCPid, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }. client_terminate(CState = #client_msstate { client_ref = Ref }) -> @@ -429,27 +424,16 @@ write(MsgId, Msg, ok = server_cast(CState, {write, CRef, MsgId}). read(MsgId, - CState = #client_msstate { dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts }) -> - %% 1. Check the dedup cache - case fetch_and_increment_cache(DedupCacheEts, MsgId) of - not_found -> - %% 2. Check the cur file cache - case ets:lookup(CurFileCacheEts, MsgId) of - [] -> - Defer = fun() -> - {server_call(CState, {read, MsgId}), CState} - end, - case index_lookup_positive_ref_count(MsgId, CState) of - not_found -> Defer(); - MsgLocation -> client_read1(MsgLocation, Defer, CState) - end; - [{MsgId, Msg, _CacheRefCount}] -> - %% Although we've found it, we don't know the - %% refcount, so can't insert into dedup cache - {{ok, Msg}, CState} + CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> + %% Check the cur file cache + case ets:lookup(CurFileCacheEts, MsgId) of + [] -> + Defer = fun() -> {server_call(CState, {read, MsgId}), CState} end, + case index_lookup_positive_ref_count(MsgId, CState) of + not_found -> Defer(); + MsgLocation -> client_read1(MsgLocation, Defer, CState) end; - Msg -> + [{MsgId, Msg, _CacheRefCount}] -> {{ok, Msg}, CState} end. @@ -457,8 +441,6 @@ contains(MsgId, CState) -> server_call(CState, {contains, MsgId}). remove([], _CState) -> ok; remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> server_cast(CState, {remove, CRef, MsgIds}). -release([], _CState) -> ok; -release(MsgIds, CState) -> server_cast(CState, {release, MsgIds}). sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}). sync(Server) -> @@ -517,7 +499,6 @@ client_read2(false, _Right, client_read3(#msg_location { msg_id = MsgId, file = File }, Defer, CState = #client_msstate { file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, gc_pid = GCPid, client_ref = Ref }) -> Release = @@ -574,8 +555,8 @@ client_read3(#msg_location { msg_id = MsgId, file = File }, Defer, %% Could the msg_store now mark the file to be %% closed? No: marks for closing are issued only %% when the msg_store has locked the file. - {Msg, CState2} = %% This will never be the current file - read_from_disk(MsgLocation, CState1, DedupCacheEts), + %% This will never be the current file + {Msg, CState2} = read_from_disk(MsgLocation, CState1), Release(), %% this MUST NOT fail with badarg {{ok, Msg}, CState2}; #msg_location {} = MsgLocation -> %% different file! @@ -639,7 +620,6 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> %% CleanShutdown <=> msg location index and file_summary both %% recovered correctly. - DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]), FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles, [ordered_set, public]), CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]), @@ -669,7 +649,6 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> gc_pid = GCPid, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, dying_clients = sets:new(), clients = Clients, @@ -720,14 +699,12 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, index_module = IndexModule, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, clients = Clients, gc_pid = GCPid }) -> Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients), - reply({IndexState, IndexModule, Dir, GCPid, - FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts}, - State #msstate { clients = Clients1 }); + reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, + CurFileCacheEts}, State #msstate { clients = Clients1 }); handle_call({client_terminate, CRef}, _From, State) -> reply(ok, clear_client(CRef, State)); @@ -781,12 +758,6 @@ handle_cast({remove, CRef, MsgIds}, State) -> noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds), removed, State1))); -handle_cast({release, MsgIds}, State = - #msstate { dedup_cache_ets = DedupCacheEts }) -> - lists:foreach( - fun (MsgId) -> decrement_cache(DedupCacheEts, MsgId) end, MsgIds), - noreply(State); - handle_cast({sync, MsgIds, K}, State = #msstate { current_file = CurFile, current_file_handle = CurHdl, @@ -840,7 +811,6 @@ terminate(_Reason, State = #msstate { index_state = IndexState, gc_pid = GCPid, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, clients = Clients, dir = Dir }) -> @@ -856,7 +826,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, State3 = close_all_handles(State1), ok = store_file_summary(FileSummaryEts, Dir), [true = ets:delete(T) || - T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]], + T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]], IndexModule:terminate(IndexState), ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)}, {index_module, IndexModule}], Dir), @@ -978,26 +948,18 @@ write_message(MsgId, Msg, sum_valid_data = SumValid + TotalSize, sum_file_size = SumFileSize + TotalSize }). -read_message(MsgId, From, - State = #msstate { dedup_cache_ets = DedupCacheEts }) -> +read_message(MsgId, From, State) -> case index_lookup_positive_ref_count(MsgId, State) of - not_found -> - gen_server2:reply(From, not_found), - State; - MsgLocation -> - case fetch_and_increment_cache(DedupCacheEts, MsgId) of - not_found -> read_message1(From, MsgLocation, State); - Msg -> gen_server2:reply(From, {ok, Msg}), - State - end + not_found -> gen_server2:reply(From, not_found), + State; + MsgLocation -> read_message1(From, MsgLocation, State) end. -read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount, - file = File, offset = Offset } = MsgLoc, +read_message1(From, #msg_location { msg_id = MsgId, file = File, + offset = Offset } = MsgLoc, State = #msstate { current_file = CurFile, current_file_handle = CurHdl, file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }) -> case File =:= CurFile of true -> {Msg, State1} = @@ -1010,10 +972,8 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount, true -> file_handle_cache:flush(CurHdl); false -> ok end, - read_from_disk(MsgLoc, State, DedupCacheEts); + read_from_disk(MsgLoc, State); [{MsgId, Msg1, _CacheRefCount}] -> - ok = maybe_insert_into_cache( - DedupCacheEts, RefCount, MsgId, Msg1), {Msg1, State} end, gen_server2:reply(From, {ok, Msg}), @@ -1023,17 +983,14 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount, case Locked of true -> add_to_pending_gc_completion({read, MsgId, From}, File, State); - false -> {Msg, State1} = - read_from_disk(MsgLoc, State, DedupCacheEts), + false -> {Msg, State1} = read_from_disk(MsgLoc, State), gen_server2:reply(From, {ok, Msg}), State1 end end. -read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount, - file = File, offset = Offset, - total_size = TotalSize }, - State, DedupCacheEts) -> +read_from_disk(#msg_location { msg_id = MsgId, file = File, offset = Offset, + total_size = TotalSize }, State) -> {Hdl, State1} = get_read_handle(File, State), {ok, Offset} = file_handle_cache:position(Hdl, Offset), {ok, {MsgId, Msg}} = @@ -1049,7 +1006,6 @@ read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount, {proc_dict, get()} ]}} end, - ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg), {Msg, State1}. contains_message(MsgId, From, @@ -1068,8 +1024,7 @@ contains_message(MsgId, From, end. remove_message(MsgId, CRef, - State = #msstate { file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts }) -> + State = #msstate { file_summary_ets = FileSummaryEts }) -> case should_mask_action(CRef, MsgId, State) of {true, _Location} -> State; @@ -1090,8 +1045,7 @@ remove_message(MsgId, CRef, %% don't remove from CUR_FILE_CACHE_ETS_NAME here %% because there may be further writes in the mailbox %% for the same msg. - 1 -> ok = remove_cache_entry(DedupCacheEts, MsgId), - case ets:lookup(FileSummaryEts, File) of + 1 -> case ets:lookup(FileSummaryEts, File) of [#file_summary { locked = true }] -> add_to_pending_gc_completion( {remove, MsgId, CRef}, File, State); @@ -1101,8 +1055,7 @@ remove_message(MsgId, CRef, File, adjust_valid_total_size(File, -TotalSize, State)) end; - _ -> ok = decrement_cache(DedupCacheEts, MsgId), - ok = Dec(), + _ -> ok = Dec(), State end end. @@ -1325,12 +1278,6 @@ list_sorted_file_names(Dir, Ext) -> %% message cache helper functions %%---------------------------------------------------------------------------- -maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg) - when RefCount > 1 -> - update_msg_cache(DedupCacheEts, MsgId, Msg); -maybe_insert_into_cache(_DedupCacheEts, _RefCount, _MsgId, _Msg) -> - ok. - update_msg_cache(CacheEts, MsgId, Msg) -> case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of true -> ok; @@ -1339,34 +1286,6 @@ update_msg_cache(CacheEts, MsgId, Msg) -> fun () -> update_msg_cache(CacheEts, MsgId, Msg) end) end. -remove_cache_entry(DedupCacheEts, MsgId) -> - true = ets:delete(DedupCacheEts, MsgId), - ok. - -fetch_and_increment_cache(DedupCacheEts, MsgId) -> - case ets:lookup(DedupCacheEts, MsgId) of - [] -> - not_found; - [{_MsgId, Msg, _RefCount}] -> - safe_ets_update_counter_ok( - DedupCacheEts, MsgId, {3, +1}, - %% someone has deleted us in the meantime, insert us - fun () -> ok = update_msg_cache(DedupCacheEts, MsgId, Msg) end), - Msg - end. - -decrement_cache(DedupCacheEts, MsgId) -> - true = safe_ets_update_counter( - DedupCacheEts, MsgId, {3, -1}, - fun (N) when N =< 0 -> true = ets:delete(DedupCacheEts, MsgId); - (_N) -> true - end, - %% MsgId is not in there because although it's been - %% delivered, it's never actually been read (think: - %% persistent message held in RAM) - fun () -> true end), - ok. - %%---------------------------------------------------------------------------- %% index %%---------------------------------------------------------------------------- @@ -1592,8 +1511,8 @@ build_index(Gatherer, Left, [], sum_file_size = SumFileSize }) -> case gatherer:out(Gatherer) of empty -> + unlink(Gatherer), ok = gatherer:stop(Gatherer), - ok = rabbit_misc:unlink_and_capture_exit(Gatherer), ok = index_delete_by_file(undefined, State), Offset = case ets:lookup(FileSummaryEts, Left) of [] -> 0; @@ -2007,7 +1926,10 @@ transform_msg_file(FileOld, FileNew, TransformFun) -> rabbit_msg_file:scan( RefOld, filelib:file_size(FileOld), fun({MsgId, _Size, _Offset, BinMsg}, ok) -> - {ok, MsgNew} = TransformFun(binary_to_term(BinMsg)), + {ok, MsgNew} = case binary_to_term(BinMsg) of + <<>> -> {ok, <<>>}; %% dying client marker + Msg -> TransformFun(Msg) + end, {ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew), ok end, ok), diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 367953b8..aaf3df78 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -514,8 +514,8 @@ queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) -> queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> case gatherer:out(Gatherer) of empty -> + unlink(Gatherer), ok = gatherer:stop(Gatherer), - ok = rabbit_misc:unlink_and_capture_exit(Gatherer), finished; {value, {MsgId, Count}} -> {MsgId, Count, {next, Gatherer}} @@ -1036,8 +1036,8 @@ foreach_queue_index(Funs) -> end) end || QueueDirName <- QueueDirNames], empty = gatherer:out(Gatherer), - ok = gatherer:stop(Gatherer), - ok = rabbit_misc:unlink_and_capture_exit(Gatherer). + unlink(Gatherer), + ok = gatherer:stop(Gatherer). transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) -> ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ea7d1343..294fae97 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -700,9 +700,14 @@ test_topic_expect_match(X, List) -> lists:foreach( fun ({Key, Expected}) -> BinKey = list_to_binary(Key), + Message = rabbit_basic:message(X#exchange.name, BinKey, + #'P_basic'{}, <<>>), Res = rabbit_exchange_type_topic:route( - X, #delivery{message = #basic_message{routing_keys = - [BinKey]}}), + X, #delivery{mandatory = false, + immediate = false, + txn = none, + sender = self(), + message = Message}), ExpectedRes = lists:map( fun (Q) -> #resource{virtual_host = <<"/">>, kind = queue, @@ -1781,8 +1786,6 @@ test_msg_store() -> true = msg_store_contains(true, MsgIds2ndHalf, MSCState2), %% read the second half again MSCState3 = msg_store_read(MsgIds2ndHalf, MSCState2), - %% release the second half, just for fun (aka code coverage) - ok = rabbit_msg_store:release(MsgIds2ndHalf, MSCState3), %% read the second half again, just for fun (aka code coverage) MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3), ok = rabbit_msg_store:client_terminate(MSCState4), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7a1102e5..ff7252fd 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -728,7 +728,7 @@ requeue(AckTags, MsgPropsFun, State) -> needs_confirming = false } end, a(reduce_memory_use( - ack(fun msg_store_release/3, + ack(fun (_, _, _) -> ok end, fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> {_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps), true, false, State1), @@ -972,11 +972,6 @@ msg_store_remove(MSCState, IsPersistent, MsgIds) -> MSCState, IsPersistent, fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end). -msg_store_release(MSCState, IsPersistent, MsgIds) -> - with_immutable_msg_store_state( - MSCState, IsPersistent, - fun (MCSState1) -> rabbit_msg_store:release(MsgIds, MCSState1) end). - msg_store_sync(MSCState, IsPersistent, MsgIds, Fun) -> with_immutable_msg_store_state( MSCState, IsPersistent, diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 1a240856..ec1ee9cd 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -38,6 +38,9 @@ %% child is a supervisor and it exits normally (i.e. with reason of %% 'shutdown') then the child's parent also exits normally. %% +%% 5) normal, and {shutdown, _} exit reasons are all treated the same +%% (i.e. are regarded as normal exits) +%% %% All modifications are (C) 2010-2011 VMware, Inc. %% %% %CopyrightBegin% @@ -544,17 +547,12 @@ do_restart({RestartType, Delay}, Reason, Child, State) -> do_restart(permanent, Reason, Child, State) -> report_error(child_terminated, Reason, Child, State#state.name), restart(Child, State); -do_restart(intrinsic, normal, Child, State) -> - {shutdown, state_del_child(Child, State)}; -do_restart(intrinsic, shutdown, Child = #child{child_type = supervisor}, - State) -> - {shutdown, state_del_child(Child, State)}; -do_restart(_, normal, Child, State) -> - NState = state_del_child(Child, State), - {ok, NState}; -do_restart(_, shutdown, Child, State) -> - NState = state_del_child(Child, State), - {ok, NState}; +do_restart(Type, normal, Child, State) -> + del_child_and_maybe_shutdown(Type, Child, State); +do_restart(Type, {shutdown, _}, Child, State) -> + del_child_and_maybe_shutdown(Type, Child, State); +do_restart(Type, shutdown, Child = #child{child_type = supervisor}, State) -> + del_child_and_maybe_shutdown(Type, Child, State); do_restart(Type, Reason, Child, State) when Type =:= transient orelse Type =:= intrinsic -> report_error(child_terminated, Reason, Child, State#state.name), @@ -564,6 +562,11 @@ do_restart(temporary, Reason, Child, State) -> NState = state_del_child(Child, State), {ok, NState}. +del_child_and_maybe_shutdown(intrinsic, Child, State) -> + {shutdown, state_del_child(Child, State)}; +del_child_and_maybe_shutdown(_, Child, State) -> + {ok, state_del_child(Child, State)}. + restart(Child, State) -> case add_restart(State) of {ok, NState} -> diff --git a/src/test_sup.erl b/src/test_sup.erl index b4df1fd0..150235da 100644 --- a/src/test_sup.erl +++ b/src/test_sup.erl @@ -45,8 +45,8 @@ test_supervisor_delayed_restart(SupPid) -> with_sup(RestartStrategy, Fun) -> {ok, SupPid} = supervisor2:start_link(?MODULE, [RestartStrategy]), Res = Fun(SupPid), + unlink(SupPid), exit(SupPid, shutdown), - rabbit_misc:unlink_and_capture_exit(SupPid), Res. init([RestartStrategy]) -> |