summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-04-05 14:09:54 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-04-05 14:09:54 +0100
commit0bad583b1b18d6c8be7eec03e1ad8e9add149214 (patch)
treec6893a20e2c678d80a85420f17c2b7e19a655835
parent9298d8a1dd79f862964a1d101908562b49fc91f6 (diff)
parentf4b10422c87885a746ac509c68a677f18bc60296 (diff)
downloadrabbitmq-server-0bad583b1b18d6c8be7eec03e1ad8e9add149214.tar.gz
merge heads
-rwxr-xr-xpackaging/common/rabbitmq-server.ocf8
-rwxr-xr-xscripts/rabbitmq-env3
-rw-r--r--src/gm.erl11
-rw-r--r--src/gm_soak_test.erl7
-rw-r--r--src/rabbit_amqqueue_process.erl14
-rw-r--r--src/rabbit_control.erl17
-rw-r--r--src/rabbit_exchange_type_topic.erl8
-rw-r--r--src/rabbit_misc.erl10
-rw-r--r--src/rabbit_msg_store.erl146
-rw-r--r--src/rabbit_queue_index.erl6
-rw-r--r--src/rabbit_tests.erl11
-rw-r--r--src/rabbit_variable_queue.erl7
-rw-r--r--src/supervisor2.erl25
-rw-r--r--src/test_sup.erl2
14 files changed, 102 insertions, 173 deletions
diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf
index 94999d0e..d58c48ed 100755
--- a/packaging/common/rabbitmq-server.ocf
+++ b/packaging/common/rabbitmq-server.ocf
@@ -103,9 +103,9 @@ The IP Port for rabbitmq-server to listen on
<parameter name="config_file" unique="0" required="0">
<longdesc lang="en">
-Location of the config file
+Location of the config file (without the .config suffix)
</longdesc>
-<shortdesc lang="en">Config file path</shortdesc>
+<shortdesc lang="en">Config file path (without the .config suffix)</shortdesc>
<content type="string" default="" />
</parameter>
@@ -189,8 +189,8 @@ rabbit_validate_partial() {
}
rabbit_validate_full() {
- if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e $RABBITMQ_CONFIG_FILE ]; then
- ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file";
+ if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e "${RABBITMQ_CONFIG_FILE}.config" ]; then
+ ocf_log err "rabbitmq-server config_file ${RABBITMQ_CONFIG_FILE}.config does not exist or is not a file";
exit $OCF_ERR_INSTALLED;
fi
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index 3e173949..a2ef8d3c 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -37,7 +37,8 @@ RABBITMQ_HOME="${SCRIPT_DIR}/.."
NODENAME=rabbit@${HOSTNAME%%.*}
# Load configuration from the rabbitmq.conf file
-if [ -f /etc/rabbitmq/rabbitmq.conf ]; then
+if [ -f /etc/rabbitmq/rabbitmq.conf ] && \
+ [ ! -f /etc/rabbitmq/rabbitmq-env.conf ] ; then
echo -n "WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- "
echo "location has moved to /etc/rabbitmq/rabbitmq-env.conf"
fi
diff --git a/src/gm.erl b/src/gm.erl
index 5b3623cf..8b7dc70c 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -516,7 +516,8 @@ flush(Server) ->
init([GroupName, Module, Args]) ->
- random:seed(now()),
+ {MegaSecs, Secs, MicroSecs} = now(),
+ random:seed(MegaSecs, Secs, MicroSecs),
gen_server2:cast(self(), join),
Self = self(),
{ok, #state { self = Self,
@@ -1010,7 +1011,7 @@ prune_or_create_group(Self, GroupName) ->
fun () -> GroupNew = #gm_group { name = GroupName,
members = [Self],
version = 0 },
- case mnesia:read(?GROUP_TABLE, GroupName) of
+ case mnesia:read({?GROUP_TABLE, GroupName}) of
[] ->
mnesia:write(GroupNew),
GroupNew;
@@ -1028,7 +1029,7 @@ record_dead_member_in_group(Member, GroupName) ->
{atomic, Group} =
mnesia:sync_transaction(
fun () -> [Group1 = #gm_group { members = Members, version = Ver }] =
- mnesia:read(?GROUP_TABLE, GroupName),
+ mnesia:read({?GROUP_TABLE, GroupName}),
case lists:splitwith(
fun (Member1) -> Member1 =/= Member end, Members) of
{_Members1, []} -> %% not found - already recorded dead
@@ -1048,7 +1049,7 @@ record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
mnesia:sync_transaction(
fun () ->
[#gm_group { members = Members, version = Ver } = Group1] =
- mnesia:read(?GROUP_TABLE, GroupName),
+ mnesia:read({?GROUP_TABLE, GroupName}),
{Prefix, [Left | Suffix]} =
lists:splitwith(fun (M) -> M =/= Left end, Members),
Members1 = Prefix ++ [Left, NewMember | Suffix],
@@ -1067,7 +1068,7 @@ erase_members_in_group(Members, GroupName) ->
fun () ->
[Group1 = #gm_group { members = [_|_] = Members1,
version = Ver }] =
- mnesia:read(?GROUP_TABLE, GroupName),
+ mnesia:read({?GROUP_TABLE, GroupName}),
case Members1 -- DeadMembers of
Members1 -> Group1;
Members2 -> Group2 =
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index 1f8832a6..dae42ac7 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -35,7 +35,7 @@ with_state(Fun) ->
inc() ->
case 1 + get(count) of
- 100000 -> Now = os:timestamp(),
+ 100000 -> Now = now(),
Start = put(ts, Now),
Diff = timer:now_diff(Now, Start),
Rate = 100000 / (Diff / 1000000),
@@ -48,7 +48,7 @@ joined([], Members) ->
io:format("Joined ~p (~p members)~n", [self(), length(Members)]),
put(state, dict:from_list([{Member, empty} || Member <- Members])),
put(count, 0),
- put(ts, os:timestamp()),
+ put(ts, now()),
ok.
members_changed([], Births, Deaths) ->
@@ -101,7 +101,8 @@ terminate([], Reason) ->
spawn_member() ->
spawn_link(
fun () ->
- random:seed(now()),
+ {MegaSecs, Secs, MicroSecs} = now(),
+ random:seed(MegaSecs, Secs, MicroSecs),
%% start up delay of no more than 10 seconds
timer:sleep(random:uniform(10000)),
{ok, Pid} = gm:start_link(?MODULE, ?MODULE, []),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3f5758ce..2b0fe17e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -428,11 +428,19 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
{CMs, MTC0}
end
end, {gb_trees:empty(), MTC}, MsgIds),
- gb_trees:map(fun(ChPid, MsgSeqNos) ->
- rabbit_channel:confirm(ChPid, MsgSeqNos)
- end, CMs),
+ gb_trees_foreach(fun(ChPid, MsgSeqNos) ->
+ rabbit_channel:confirm(ChPid, MsgSeqNos)
+ end, CMs),
State#q{msg_id_to_channel = MTC1}.
+gb_trees_foreach(_, none) ->
+ ok;
+gb_trees_foreach(Fun, {Key, Val, It}) ->
+ Fun(Key, Val),
+ gb_trees_foreach(Fun, gb_trees:next(It));
+gb_trees_foreach(Fun, Tree) ->
+ gb_trees_foreach(Fun, gb_trees:next(gb_trees:iterator(Tree))).
+
gb_trees_cons(Key, Value, Tree) ->
case gb_trees:lookup(Key, Tree) of
{value, Values} -> gb_trees:update(Key, [Value | Values], Tree);
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 571eb5e4..1af91f4c 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -127,6 +127,8 @@ usage() ->
io:format("~s", [rabbit_ctl_usage:usage()]),
quit(1).
+%%----------------------------------------------------------------------------
+
action(stop, Node, [], _Opts, Inform) ->
Inform("Stopping and halting node ~p", [Node]),
call(Node, {rabbit, stop_and_halt, []});
@@ -159,6 +161,10 @@ action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
[Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]);
+action(wait, Node, [], _Opts, Inform) ->
+ Inform("Waiting for ~p", [Node]),
+ wait_for_application(Node, ?WAIT_FOR_VM_ATTEMPTS);
+
action(status, Node, [], _Opts, Inform) ->
Inform("Status of node ~p", [Node]),
case call(Node, {rabbit, status, []}) of
@@ -292,18 +298,15 @@ action(list_permissions, Node, [], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Listing permissions in vhost ~p", [VHost]),
display_list(call(Node, {rabbit_auth_backend_internal,
- list_vhost_permissions, [VHost]}));
+ list_vhost_permissions, [VHost]})).
-action(wait, Node, [], _Opts, Inform) ->
- Inform("Waiting for ~p", [Node]),
- wait_for_application(Node, ?WAIT_FOR_VM_ATTEMPTS).
+%%----------------------------------------------------------------------------
wait_for_application(Node, Attempts) ->
case rpc_call(Node, application, which_applications, [infinity]) of
- {badrpc, _} = E -> NewAttempts = Attempts - 1,
- case NewAttempts of
+ {badrpc, _} = E -> case Attempts of
0 -> E;
- _ -> wait_for_application0(Node, NewAttempts)
+ _ -> wait_for_application0(Node, Attempts - 1)
end;
Apps -> case proplists:is_defined(rabbit, Apps) of
%% We've seen the node up; if it goes down
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index ffd1e583..c192f8cf 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -188,10 +188,10 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) ->
end.
trie_child(X, Node, Word) ->
- case mnesia:read(rabbit_topic_trie_edge,
- #trie_edge{exchange_name = X,
- node_id = Node,
- word = Word}) of
+ case mnesia:read({rabbit_topic_trie_edge,
+ #trie_edge{exchange_name = X,
+ node_id = Node,
+ word = Word}}) of
[#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode};
[] -> error
end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 2e9563cf..1daeeb2a 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -48,8 +48,7 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
--export([recursive_delete/1, recursive_copy/2, dict_cons/3, orddict_cons/3,
- unlink_and_capture_exit/1]).
+-export([recursive_delete/1, recursive_copy/2, dict_cons/3, orddict_cons/3]).
-export([get_options/2]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
@@ -178,7 +177,6 @@
-> rabbit_types:ok_or_error({file:filename(), file:filename(), any()})).
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
--spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok').
-spec(get_options/2 :: ([optdef()], [string()])
-> {[string()], [{string(), any()}]}).
-spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
@@ -749,12 +747,6 @@ dict_cons(Key, Value, Dict) ->
orddict_cons(Key, Value, Dict) ->
orddict:update(Key, fun (List) -> [Value | List] end, [Value], Dict).
-unlink_and_capture_exit(Pid) ->
- unlink(Pid),
- receive {'EXIT', Pid, _} -> ok
- after 0 -> ok
- end.
-
%% Separate flags and options from arguments.
%% get_options([{flag, "-q"}, {option, "-p", "/"}],
%% ["set_permissions","-p","/","guest",
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index bb26de64..65688142 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -21,7 +21,7 @@
-export([start_link/4, successfully_recovered_state/1,
client_init/4, client_terminate/1, client_delete_and_terminate/1,
client_ref/1, close_all_indicated/1,
- write/3, read/2, contains/2, remove/2, release/2, sync/3]).
+ write/3, read/2, contains/2, remove/2, sync/3]).
-export([sync/1, set_maximum_since_use/2,
has_readers/2, combine_files/3, delete_file/2]). %% internal
@@ -67,7 +67,6 @@
gc_pid, %% pid of our GC
file_handles_ets, %% tid of the shared file handles table
file_summary_ets, %% tid of the file summary table
- dedup_cache_ets, %% tid of dedup cache table
cur_file_cache_ets, %% tid of current file cache table
dying_clients, %% set of dying clients
clients, %% map of references of all registered clients
@@ -87,7 +86,6 @@
gc_pid,
file_handles_ets,
file_summary_ets,
- dedup_cache_ets,
cur_file_cache_ets
}).
@@ -130,7 +128,6 @@
gc_pid :: pid(),
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
- dedup_cache_ets :: ets:tid(),
cur_file_cache_ets :: ets:tid()}).
-type(msg_ref_delta_gen(A) ::
fun ((A) -> 'finished' |
@@ -153,7 +150,6 @@
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
-spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()).
-spec(remove/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok').
--spec(release/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok').
-spec(sync/3 ::
([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok').
@@ -396,7 +392,7 @@ successfully_recovered_state(Server) ->
client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
{IState, IModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
+ FileHandlesEts, FileSummaryEts, CurFileCacheEts} =
gen_server2:call(
Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
#client_msstate { server = Server,
@@ -408,7 +404,6 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }.
client_terminate(CState = #client_msstate { client_ref = Ref }) ->
@@ -429,27 +424,16 @@ write(MsgId, Msg,
ok = server_cast(CState, {write, CRef, MsgId}).
read(MsgId,
- CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
- %% 1. Check the dedup cache
- case fetch_and_increment_cache(DedupCacheEts, MsgId) of
- not_found ->
- %% 2. Check the cur file cache
- case ets:lookup(CurFileCacheEts, MsgId) of
- [] ->
- Defer = fun() ->
- {server_call(CState, {read, MsgId}), CState}
- end,
- case index_lookup_positive_ref_count(MsgId, CState) of
- not_found -> Defer();
- MsgLocation -> client_read1(MsgLocation, Defer, CState)
- end;
- [{MsgId, Msg, _CacheRefCount}] ->
- %% Although we've found it, we don't know the
- %% refcount, so can't insert into dedup cache
- {{ok, Msg}, CState}
+ CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ %% Check the cur file cache
+ case ets:lookup(CurFileCacheEts, MsgId) of
+ [] ->
+ Defer = fun() -> {server_call(CState, {read, MsgId}), CState} end,
+ case index_lookup_positive_ref_count(MsgId, CState) of
+ not_found -> Defer();
+ MsgLocation -> client_read1(MsgLocation, Defer, CState)
end;
- Msg ->
+ [{MsgId, Msg, _CacheRefCount}] ->
{{ok, Msg}, CState}
end.
@@ -457,8 +441,6 @@ contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
remove([], _CState) -> ok;
remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
server_cast(CState, {remove, CRef, MsgIds}).
-release([], _CState) -> ok;
-release(MsgIds, CState) -> server_cast(CState, {release, MsgIds}).
sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}).
sync(Server) ->
@@ -517,7 +499,6 @@ client_read2(false, _Right,
client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
CState = #client_msstate { file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
gc_pid = GCPid,
client_ref = Ref }) ->
Release =
@@ -574,8 +555,8 @@ client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
%% Could the msg_store now mark the file to be
%% closed? No: marks for closing are issued only
%% when the msg_store has locked the file.
- {Msg, CState2} = %% This will never be the current file
- read_from_disk(MsgLocation, CState1, DedupCacheEts),
+ %% This will never be the current file
+ {Msg, CState2} = read_from_disk(MsgLocation, CState1),
Release(), %% this MUST NOT fail with badarg
{{ok, Msg}, CState2};
#msg_location {} = MsgLocation -> %% different file!
@@ -639,7 +620,6 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
%% CleanShutdown <=> msg location index and file_summary both
%% recovered correctly.
- DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]),
FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
[ordered_set, public]),
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
@@ -669,7 +649,6 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
dying_clients = sets:new(),
clients = Clients,
@@ -720,14 +699,12 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
index_module = IndexModule,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
clients = Clients,
gc_pid = GCPid }) ->
Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
- reply({IndexState, IndexModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
- State #msstate { clients = Clients1 });
+ reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
+ CurFileCacheEts}, State #msstate { clients = Clients1 });
handle_call({client_terminate, CRef}, _From, State) ->
reply(ok, clear_client(CRef, State));
@@ -781,12 +758,6 @@ handle_cast({remove, CRef, MsgIds}, State) ->
noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds),
removed, State1)));
-handle_cast({release, MsgIds}, State =
- #msstate { dedup_cache_ets = DedupCacheEts }) ->
- lists:foreach(
- fun (MsgId) -> decrement_cache(DedupCacheEts, MsgId) end, MsgIds),
- noreply(State);
-
handle_cast({sync, MsgIds, K},
State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
@@ -840,7 +811,6 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
clients = Clients,
dir = Dir }) ->
@@ -856,7 +826,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
State3 = close_all_handles(State1),
ok = store_file_summary(FileSummaryEts, Dir),
[true = ets:delete(T) ||
- T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]],
+ T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]],
IndexModule:terminate(IndexState),
ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
{index_module, IndexModule}], Dir),
@@ -978,26 +948,18 @@ write_message(MsgId, Msg,
sum_valid_data = SumValid + TotalSize,
sum_file_size = SumFileSize + TotalSize }).
-read_message(MsgId, From,
- State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
+read_message(MsgId, From, State) ->
case index_lookup_positive_ref_count(MsgId, State) of
- not_found ->
- gen_server2:reply(From, not_found),
- State;
- MsgLocation ->
- case fetch_and_increment_cache(DedupCacheEts, MsgId) of
- not_found -> read_message1(From, MsgLocation, State);
- Msg -> gen_server2:reply(From, {ok, Msg}),
- State
- end
+ not_found -> gen_server2:reply(From, not_found),
+ State;
+ MsgLocation -> read_message1(From, MsgLocation, State)
end.
-read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
- file = File, offset = Offset } = MsgLoc,
+read_message1(From, #msg_location { msg_id = MsgId, file = File,
+ offset = Offset } = MsgLoc,
State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }) ->
case File =:= CurFile of
true -> {Msg, State1} =
@@ -1010,10 +972,8 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
true -> file_handle_cache:flush(CurHdl);
false -> ok
end,
- read_from_disk(MsgLoc, State, DedupCacheEts);
+ read_from_disk(MsgLoc, State);
[{MsgId, Msg1, _CacheRefCount}] ->
- ok = maybe_insert_into_cache(
- DedupCacheEts, RefCount, MsgId, Msg1),
{Msg1, State}
end,
gen_server2:reply(From, {ok, Msg}),
@@ -1023,17 +983,14 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
case Locked of
true -> add_to_pending_gc_completion({read, MsgId, From},
File, State);
- false -> {Msg, State1} =
- read_from_disk(MsgLoc, State, DedupCacheEts),
+ false -> {Msg, State1} = read_from_disk(MsgLoc, State),
gen_server2:reply(From, {ok, Msg}),
State1
end
end.
-read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount,
- file = File, offset = Offset,
- total_size = TotalSize },
- State, DedupCacheEts) ->
+read_from_disk(#msg_location { msg_id = MsgId, file = File, offset = Offset,
+ total_size = TotalSize }, State) ->
{Hdl, State1} = get_read_handle(File, State),
{ok, Offset} = file_handle_cache:position(Hdl, Offset),
{ok, {MsgId, Msg}} =
@@ -1049,7 +1006,6 @@ read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount,
{proc_dict, get()}
]}}
end,
- ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg),
{Msg, State1}.
contains_message(MsgId, From,
@@ -1068,8 +1024,7 @@ contains_message(MsgId, From,
end.
remove_message(MsgId, CRef,
- State = #msstate { file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts }) ->
+ State = #msstate { file_summary_ets = FileSummaryEts }) ->
case should_mask_action(CRef, MsgId, State) of
{true, _Location} ->
State;
@@ -1090,8 +1045,7 @@ remove_message(MsgId, CRef,
%% don't remove from CUR_FILE_CACHE_ETS_NAME here
%% because there may be further writes in the mailbox
%% for the same msg.
- 1 -> ok = remove_cache_entry(DedupCacheEts, MsgId),
- case ets:lookup(FileSummaryEts, File) of
+ 1 -> case ets:lookup(FileSummaryEts, File) of
[#file_summary { locked = true }] ->
add_to_pending_gc_completion(
{remove, MsgId, CRef}, File, State);
@@ -1101,8 +1055,7 @@ remove_message(MsgId, CRef,
File, adjust_valid_total_size(File, -TotalSize,
State))
end;
- _ -> ok = decrement_cache(DedupCacheEts, MsgId),
- ok = Dec(),
+ _ -> ok = Dec(),
State
end
end.
@@ -1325,12 +1278,6 @@ list_sorted_file_names(Dir, Ext) ->
%% message cache helper functions
%%----------------------------------------------------------------------------
-maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg)
- when RefCount > 1 ->
- update_msg_cache(DedupCacheEts, MsgId, Msg);
-maybe_insert_into_cache(_DedupCacheEts, _RefCount, _MsgId, _Msg) ->
- ok.
-
update_msg_cache(CacheEts, MsgId, Msg) ->
case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of
true -> ok;
@@ -1339,34 +1286,6 @@ update_msg_cache(CacheEts, MsgId, Msg) ->
fun () -> update_msg_cache(CacheEts, MsgId, Msg) end)
end.
-remove_cache_entry(DedupCacheEts, MsgId) ->
- true = ets:delete(DedupCacheEts, MsgId),
- ok.
-
-fetch_and_increment_cache(DedupCacheEts, MsgId) ->
- case ets:lookup(DedupCacheEts, MsgId) of
- [] ->
- not_found;
- [{_MsgId, Msg, _RefCount}] ->
- safe_ets_update_counter_ok(
- DedupCacheEts, MsgId, {3, +1},
- %% someone has deleted us in the meantime, insert us
- fun () -> ok = update_msg_cache(DedupCacheEts, MsgId, Msg) end),
- Msg
- end.
-
-decrement_cache(DedupCacheEts, MsgId) ->
- true = safe_ets_update_counter(
- DedupCacheEts, MsgId, {3, -1},
- fun (N) when N =< 0 -> true = ets:delete(DedupCacheEts, MsgId);
- (_N) -> true
- end,
- %% MsgId is not in there because although it's been
- %% delivered, it's never actually been read (think:
- %% persistent message held in RAM)
- fun () -> true end),
- ok.
-
%%----------------------------------------------------------------------------
%% index
%%----------------------------------------------------------------------------
@@ -1592,8 +1511,8 @@ build_index(Gatherer, Left, [],
sum_file_size = SumFileSize }) ->
case gatherer:out(Gatherer) of
empty ->
+ unlink(Gatherer),
ok = gatherer:stop(Gatherer),
- ok = rabbit_misc:unlink_and_capture_exit(Gatherer),
ok = index_delete_by_file(undefined, State),
Offset = case ets:lookup(FileSummaryEts, Left) of
[] -> 0;
@@ -2007,7 +1926,10 @@ transform_msg_file(FileOld, FileNew, TransformFun) ->
rabbit_msg_file:scan(
RefOld, filelib:file_size(FileOld),
fun({MsgId, _Size, _Offset, BinMsg}, ok) ->
- {ok, MsgNew} = TransformFun(binary_to_term(BinMsg)),
+ {ok, MsgNew} = case binary_to_term(BinMsg) of
+ <<>> -> {ok, <<>>}; %% dying client marker
+ Msg -> TransformFun(Msg)
+ end,
{ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew),
ok
end, ok),
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 367953b8..aaf3df78 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -514,8 +514,8 @@ queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) ->
queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
case gatherer:out(Gatherer) of
empty ->
+ unlink(Gatherer),
ok = gatherer:stop(Gatherer),
- ok = rabbit_misc:unlink_and_capture_exit(Gatherer),
finished;
{value, {MsgId, Count}} ->
{MsgId, Count, {next, Gatherer}}
@@ -1036,8 +1036,8 @@ foreach_queue_index(Funs) ->
end)
end || QueueDirName <- QueueDirNames],
empty = gatherer:out(Gatherer),
- ok = gatherer:stop(Gatherer),
- ok = rabbit_misc:unlink_and_capture_exit(Gatherer).
+ unlink(Gatherer),
+ ok = gatherer:stop(Gatherer).
transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ea7d1343..294fae97 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -700,9 +700,14 @@ test_topic_expect_match(X, List) ->
lists:foreach(
fun ({Key, Expected}) ->
BinKey = list_to_binary(Key),
+ Message = rabbit_basic:message(X#exchange.name, BinKey,
+ #'P_basic'{}, <<>>),
Res = rabbit_exchange_type_topic:route(
- X, #delivery{message = #basic_message{routing_keys =
- [BinKey]}}),
+ X, #delivery{mandatory = false,
+ immediate = false,
+ txn = none,
+ sender = self(),
+ message = Message}),
ExpectedRes = lists:map(
fun (Q) -> #resource{virtual_host = <<"/">>,
kind = queue,
@@ -1781,8 +1786,6 @@ test_msg_store() ->
true = msg_store_contains(true, MsgIds2ndHalf, MSCState2),
%% read the second half again
MSCState3 = msg_store_read(MsgIds2ndHalf, MSCState2),
- %% release the second half, just for fun (aka code coverage)
- ok = rabbit_msg_store:release(MsgIds2ndHalf, MSCState3),
%% read the second half again, just for fun (aka code coverage)
MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3),
ok = rabbit_msg_store:client_terminate(MSCState4),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7a1102e5..ff7252fd 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -728,7 +728,7 @@ requeue(AckTags, MsgPropsFun, State) ->
needs_confirming = false }
end,
a(reduce_memory_use(
- ack(fun msg_store_release/3,
+ ack(fun (_, _, _) -> ok end,
fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
{_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps),
true, false, State1),
@@ -972,11 +972,6 @@ msg_store_remove(MSCState, IsPersistent, MsgIds) ->
MSCState, IsPersistent,
fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end).
-msg_store_release(MSCState, IsPersistent, MsgIds) ->
- with_immutable_msg_store_state(
- MSCState, IsPersistent,
- fun (MCSState1) -> rabbit_msg_store:release(MsgIds, MCSState1) end).
-
msg_store_sync(MSCState, IsPersistent, MsgIds, Fun) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 1a240856..ec1ee9cd 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -38,6 +38,9 @@
%% child is a supervisor and it exits normally (i.e. with reason of
%% 'shutdown') then the child's parent also exits normally.
%%
+%% 5) normal, and {shutdown, _} exit reasons are all treated the same
+%% (i.e. are regarded as normal exits)
+%%
%% All modifications are (C) 2010-2011 VMware, Inc.
%%
%% %CopyrightBegin%
@@ -544,17 +547,12 @@ do_restart({RestartType, Delay}, Reason, Child, State) ->
do_restart(permanent, Reason, Child, State) ->
report_error(child_terminated, Reason, Child, State#state.name),
restart(Child, State);
-do_restart(intrinsic, normal, Child, State) ->
- {shutdown, state_del_child(Child, State)};
-do_restart(intrinsic, shutdown, Child = #child{child_type = supervisor},
- State) ->
- {shutdown, state_del_child(Child, State)};
-do_restart(_, normal, Child, State) ->
- NState = state_del_child(Child, State),
- {ok, NState};
-do_restart(_, shutdown, Child, State) ->
- NState = state_del_child(Child, State),
- {ok, NState};
+do_restart(Type, normal, Child, State) ->
+ del_child_and_maybe_shutdown(Type, Child, State);
+do_restart(Type, {shutdown, _}, Child, State) ->
+ del_child_and_maybe_shutdown(Type, Child, State);
+do_restart(Type, shutdown, Child = #child{child_type = supervisor}, State) ->
+ del_child_and_maybe_shutdown(Type, Child, State);
do_restart(Type, Reason, Child, State) when Type =:= transient orelse
Type =:= intrinsic ->
report_error(child_terminated, Reason, Child, State#state.name),
@@ -564,6 +562,11 @@ do_restart(temporary, Reason, Child, State) ->
NState = state_del_child(Child, State),
{ok, NState}.
+del_child_and_maybe_shutdown(intrinsic, Child, State) ->
+ {shutdown, state_del_child(Child, State)};
+del_child_and_maybe_shutdown(_, Child, State) ->
+ {ok, state_del_child(Child, State)}.
+
restart(Child, State) ->
case add_restart(State) of
{ok, NState} ->
diff --git a/src/test_sup.erl b/src/test_sup.erl
index b4df1fd0..150235da 100644
--- a/src/test_sup.erl
+++ b/src/test_sup.erl
@@ -45,8 +45,8 @@ test_supervisor_delayed_restart(SupPid) ->
with_sup(RestartStrategy, Fun) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, [RestartStrategy]),
Res = Fun(SupPid),
+ unlink(SupPid),
exit(SupPid, shutdown),
- rabbit_misc:unlink_and_capture_exit(SupPid),
Res.
init([RestartStrategy]) ->