summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-04-08 14:57:26 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-04-08 14:57:26 +0100
commit16d52ee78e6924f4631edae6b9b0221efcd1d918 (patch)
treecc569819a2134b6510b0e3c59867d9080585b14a
parenta10d90887efdd9eb0f8a588a8a8c94b15d1eb0aa (diff)
parentd598087432f43101dcd8eec6a04ae8f5159d90b8 (diff)
downloadrabbitmq-server-16d52ee78e6924f4631edae6b9b0221efcd1d918.tar.gz
merge bug24006 into default
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rwxr-xr-xscripts/rabbitmq-env3
-rw-r--r--src/gm.erl11
-rw-r--r--src/gm_soak_test.erl7
-rw-r--r--src/rabbit_amqqueue_process.erl14
-rw-r--r--src/rabbit_channel.erl1
-rw-r--r--src/rabbit_control.erl5
-rw-r--r--src/rabbit_exchange_type_topic.erl8
-rw-r--r--src/rabbit_misc.erl10
-rw-r--r--src/rabbit_msg_store.erl151
-rw-r--r--src/rabbit_prelaunch.erl9
-rw-r--r--src/rabbit_queue_index.erl6
-rw-r--r--src/rabbit_reader.erl3
-rw-r--r--src/rabbit_ssl.erl83
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/rabbit_variable_queue.erl7
-rw-r--r--src/supervisor2.erl25
-rw-r--r--src/test_sup.erl2
19 files changed, 184 insertions, 172 deletions
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 45af770a..f9e9df8b 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -120,6 +120,9 @@ done
rm -rf %{buildroot}
%changelog
+* Thu Apr 7 2011 Alexandru Scvortov <alexandru@rabbitmq.com> 2.4.1-1
+- New Upstream Release
+
* Tue Mar 22 2011 Alexandru Scvortov <alexandru@rabbitmq.com> 2.4.0-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 2ca5074f..0383b955 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (2.4.1-1) lucid; urgency=low
+
+ * New Upstream Release
+
+ -- Alexandru Scvortov <alexandru@rabbitmq.com> Thu, 07 Apr 2011 16:49:22 +0100
+
rabbitmq-server (2.4.0-1) lucid; urgency=low
* New Upstream Release
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index 3e173949..a2ef8d3c 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -37,7 +37,8 @@ RABBITMQ_HOME="${SCRIPT_DIR}/.."
NODENAME=rabbit@${HOSTNAME%%.*}
# Load configuration from the rabbitmq.conf file
-if [ -f /etc/rabbitmq/rabbitmq.conf ]; then
+if [ -f /etc/rabbitmq/rabbitmq.conf ] && \
+ [ ! -f /etc/rabbitmq/rabbitmq-env.conf ] ; then
echo -n "WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- "
echo "location has moved to /etc/rabbitmq/rabbitmq-env.conf"
fi
diff --git a/src/gm.erl b/src/gm.erl
index 5b3623cf..8b7dc70c 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -516,7 +516,8 @@ flush(Server) ->
init([GroupName, Module, Args]) ->
- random:seed(now()),
+ {MegaSecs, Secs, MicroSecs} = now(),
+ random:seed(MegaSecs, Secs, MicroSecs),
gen_server2:cast(self(), join),
Self = self(),
{ok, #state { self = Self,
@@ -1010,7 +1011,7 @@ prune_or_create_group(Self, GroupName) ->
fun () -> GroupNew = #gm_group { name = GroupName,
members = [Self],
version = 0 },
- case mnesia:read(?GROUP_TABLE, GroupName) of
+ case mnesia:read({?GROUP_TABLE, GroupName}) of
[] ->
mnesia:write(GroupNew),
GroupNew;
@@ -1028,7 +1029,7 @@ record_dead_member_in_group(Member, GroupName) ->
{atomic, Group} =
mnesia:sync_transaction(
fun () -> [Group1 = #gm_group { members = Members, version = Ver }] =
- mnesia:read(?GROUP_TABLE, GroupName),
+ mnesia:read({?GROUP_TABLE, GroupName}),
case lists:splitwith(
fun (Member1) -> Member1 =/= Member end, Members) of
{_Members1, []} -> %% not found - already recorded dead
@@ -1048,7 +1049,7 @@ record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
mnesia:sync_transaction(
fun () ->
[#gm_group { members = Members, version = Ver } = Group1] =
- mnesia:read(?GROUP_TABLE, GroupName),
+ mnesia:read({?GROUP_TABLE, GroupName}),
{Prefix, [Left | Suffix]} =
lists:splitwith(fun (M) -> M =/= Left end, Members),
Members1 = Prefix ++ [Left, NewMember | Suffix],
@@ -1067,7 +1068,7 @@ erase_members_in_group(Members, GroupName) ->
fun () ->
[Group1 = #gm_group { members = [_|_] = Members1,
version = Ver }] =
- mnesia:read(?GROUP_TABLE, GroupName),
+ mnesia:read({?GROUP_TABLE, GroupName}),
case Members1 -- DeadMembers of
Members1 -> Group1;
Members2 -> Group2 =
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index 1f8832a6..dae42ac7 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -35,7 +35,7 @@ with_state(Fun) ->
inc() ->
case 1 + get(count) of
- 100000 -> Now = os:timestamp(),
+ 100000 -> Now = now(),
Start = put(ts, Now),
Diff = timer:now_diff(Now, Start),
Rate = 100000 / (Diff / 1000000),
@@ -48,7 +48,7 @@ joined([], Members) ->
io:format("Joined ~p (~p members)~n", [self(), length(Members)]),
put(state, dict:from_list([{Member, empty} || Member <- Members])),
put(count, 0),
- put(ts, os:timestamp()),
+ put(ts, now()),
ok.
members_changed([], Births, Deaths) ->
@@ -101,7 +101,8 @@ terminate([], Reason) ->
spawn_member() ->
spawn_link(
fun () ->
- random:seed(now()),
+ {MegaSecs, Secs, MicroSecs} = now(),
+ random:seed(MegaSecs, Secs, MicroSecs),
%% start up delay of no more than 10 seconds
timer:sleep(random:uniform(10000)),
{ok, Pid} = gm:start_link(?MODULE, ?MODULE, []),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3f5758ce..2b0fe17e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -428,11 +428,19 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
{CMs, MTC0}
end
end, {gb_trees:empty(), MTC}, MsgIds),
- gb_trees:map(fun(ChPid, MsgSeqNos) ->
- rabbit_channel:confirm(ChPid, MsgSeqNos)
- end, CMs),
+ gb_trees_foreach(fun(ChPid, MsgSeqNos) ->
+ rabbit_channel:confirm(ChPid, MsgSeqNos)
+ end, CMs),
State#q{msg_id_to_channel = MTC1}.
+gb_trees_foreach(_, none) ->
+ ok;
+gb_trees_foreach(Fun, {Key, Val, It}) ->
+ Fun(Key, Val),
+ gb_trees_foreach(Fun, gb_trees:next(It));
+gb_trees_foreach(Fun, Tree) ->
+ gb_trees_foreach(Fun, gb_trees:next(gb_trees:iterator(Tree))).
+
gb_trees_cons(Key, Value, Tree) ->
case gb_trees:lookup(Key, Tree) of
{value, Values} -> gb_trees:update(Key, [Value | Values], Tree);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5099bf3f..0c12614c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -156,6 +156,7 @@ ready_for_close(Pid) ->
init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
Capabilities, CollectorPid, StartLimiterFun]) ->
+ process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
StatsTimer = rabbit_event:init_stats_timer(),
State = #ch{state = starting,
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6fb465b5..1af91f4c 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -304,10 +304,9 @@ action(list_permissions, Node, [], Opts, Inform) ->
wait_for_application(Node, Attempts) ->
case rpc_call(Node, application, which_applications, [infinity]) of
- {badrpc, _} = E -> NewAttempts = Attempts - 1,
- case NewAttempts of
+ {badrpc, _} = E -> case Attempts of
0 -> E;
- _ -> wait_for_application0(Node, NewAttempts)
+ _ -> wait_for_application0(Node, Attempts - 1)
end;
Apps -> case proplists:is_defined(rabbit, Apps) of
%% We've seen the node up; if it goes down
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 5cec5b41..74c566b8 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -182,10 +182,10 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) ->
end.
trie_child(X, Node, Word) ->
- case mnesia:read(rabbit_topic_trie_edge,
- #trie_edge{exchange_name = X,
- node_id = Node,
- word = Word}) of
+ case mnesia:read({rabbit_topic_trie_edge,
+ #trie_edge{exchange_name = X,
+ node_id = Node,
+ word = Word}}) of
[#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode};
[] -> error
end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index adc3ae66..85e08615 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -48,8 +48,7 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
--export([recursive_delete/1, recursive_copy/2, dict_cons/3, orddict_cons/3,
- unlink_and_capture_exit/1]).
+-export([recursive_delete/1, recursive_copy/2, dict_cons/3, orddict_cons/3]).
-export([get_options/2]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
@@ -179,7 +178,6 @@
-> rabbit_types:ok_or_error({file:filename(), file:filename(), any()})).
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
--spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok').
-spec(get_options/2 :: ([optdef()], [string()])
-> {[string()], [{string(), any()}]}).
-spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
@@ -755,12 +753,6 @@ dict_cons(Key, Value, Dict) ->
orddict_cons(Key, Value, Dict) ->
orddict:update(Key, fun (List) -> [Value | List] end, [Value], Dict).
-unlink_and_capture_exit(Pid) ->
- unlink(Pid),
- receive {'EXIT', Pid, _} -> ok
- after 0 -> ok
- end.
-
%% Separate flags and options from arguments.
%% get_options([{flag, "-q"}, {option, "-p", "/"}],
%% ["set_permissions","-p","/","guest",
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index bb26de64..3f4162cd 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -21,7 +21,7 @@
-export([start_link/4, successfully_recovered_state/1,
client_init/4, client_terminate/1, client_delete_and_terminate/1,
client_ref/1, close_all_indicated/1,
- write/3, read/2, contains/2, remove/2, release/2, sync/3]).
+ write/3, read/2, contains/2, remove/2, sync/3]).
-export([sync/1, set_maximum_since_use/2,
has_readers/2, combine_files/3, delete_file/2]). %% internal
@@ -67,7 +67,6 @@
gc_pid, %% pid of our GC
file_handles_ets, %% tid of the shared file handles table
file_summary_ets, %% tid of the file summary table
- dedup_cache_ets, %% tid of dedup cache table
cur_file_cache_ets, %% tid of current file cache table
dying_clients, %% set of dying clients
clients, %% map of references of all registered clients
@@ -87,7 +86,6 @@
gc_pid,
file_handles_ets,
file_summary_ets,
- dedup_cache_ets,
cur_file_cache_ets
}).
@@ -130,7 +128,6 @@
gc_pid :: pid(),
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
- dedup_cache_ets :: ets:tid(),
cur_file_cache_ets :: ets:tid()}).
-type(msg_ref_delta_gen(A) ::
fun ((A) -> 'finished' |
@@ -153,7 +150,6 @@
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
-spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()).
-spec(remove/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok').
--spec(release/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok').
-spec(sync/3 ::
([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok').
@@ -396,7 +392,7 @@ successfully_recovered_state(Server) ->
client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
{IState, IModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
+ FileHandlesEts, FileSummaryEts, CurFileCacheEts} =
gen_server2:call(
Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
#client_msstate { server = Server,
@@ -408,7 +404,6 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }.
client_terminate(CState = #client_msstate { client_ref = Ref }) ->
@@ -429,27 +424,16 @@ write(MsgId, Msg,
ok = server_cast(CState, {write, CRef, MsgId}).
read(MsgId,
- CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
- %% 1. Check the dedup cache
- case fetch_and_increment_cache(DedupCacheEts, MsgId) of
- not_found ->
- %% 2. Check the cur file cache
- case ets:lookup(CurFileCacheEts, MsgId) of
- [] ->
- Defer = fun() ->
- {server_call(CState, {read, MsgId}), CState}
- end,
- case index_lookup_positive_ref_count(MsgId, CState) of
- not_found -> Defer();
- MsgLocation -> client_read1(MsgLocation, Defer, CState)
- end;
- [{MsgId, Msg, _CacheRefCount}] ->
- %% Although we've found it, we don't know the
- %% refcount, so can't insert into dedup cache
- {{ok, Msg}, CState}
+ CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ %% Check the cur file cache
+ case ets:lookup(CurFileCacheEts, MsgId) of
+ [] ->
+ Defer = fun() -> {server_call(CState, {read, MsgId}), CState} end,
+ case index_lookup_positive_ref_count(MsgId, CState) of
+ not_found -> Defer();
+ MsgLocation -> client_read1(MsgLocation, Defer, CState)
end;
- Msg ->
+ [{MsgId, Msg, _CacheRefCount}] ->
{{ok, Msg}, CState}
end.
@@ -457,8 +441,6 @@ contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
remove([], _CState) -> ok;
remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
server_cast(CState, {remove, CRef, MsgIds}).
-release([], _CState) -> ok;
-release(MsgIds, CState) -> server_cast(CState, {release, MsgIds}).
sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}).
sync(Server) ->
@@ -517,7 +499,6 @@ client_read2(false, _Right,
client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
CState = #client_msstate { file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
gc_pid = GCPid,
client_ref = Ref }) ->
Release =
@@ -574,8 +555,8 @@ client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
%% Could the msg_store now mark the file to be
%% closed? No: marks for closing are issued only
%% when the msg_store has locked the file.
- {Msg, CState2} = %% This will never be the current file
- read_from_disk(MsgLocation, CState1, DedupCacheEts),
+ %% This will never be the current file
+ {Msg, CState2} = read_from_disk(MsgLocation, CState1),
Release(), %% this MUST NOT fail with badarg
{{ok, Msg}, CState2};
#msg_location {} = MsgLocation -> %% different file!
@@ -639,7 +620,6 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
%% CleanShutdown <=> msg location index and file_summary both
%% recovered correctly.
- DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]),
FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
[ordered_set, public]),
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
@@ -669,7 +649,6 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
dying_clients = sets:new(),
clients = Clients,
@@ -720,14 +699,12 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
index_module = IndexModule,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
clients = Clients,
gc_pid = GCPid }) ->
Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
- reply({IndexState, IndexModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
- State #msstate { clients = Clients1 });
+ reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
+ CurFileCacheEts}, State #msstate { clients = Clients1 });
handle_call({client_terminate, CRef}, _From, State) ->
reply(ok, clear_client(CRef, State));
@@ -781,12 +758,6 @@ handle_cast({remove, CRef, MsgIds}, State) ->
noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds),
removed, State1)));
-handle_cast({release, MsgIds}, State =
- #msstate { dedup_cache_ets = DedupCacheEts }) ->
- lists:foreach(
- fun (MsgId) -> decrement_cache(DedupCacheEts, MsgId) end, MsgIds),
- noreply(State);
-
handle_cast({sync, MsgIds, K},
State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
@@ -840,7 +811,6 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
clients = Clients,
dir = Dir }) ->
@@ -856,7 +826,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
State3 = close_all_handles(State1),
ok = store_file_summary(FileSummaryEts, Dir),
[true = ets:delete(T) ||
- T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]],
+ T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]],
IndexModule:terminate(IndexState),
ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
{index_module, IndexModule}], Dir),
@@ -978,26 +948,18 @@ write_message(MsgId, Msg,
sum_valid_data = SumValid + TotalSize,
sum_file_size = SumFileSize + TotalSize }).
-read_message(MsgId, From,
- State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
+read_message(MsgId, From, State) ->
case index_lookup_positive_ref_count(MsgId, State) of
- not_found ->
- gen_server2:reply(From, not_found),
- State;
- MsgLocation ->
- case fetch_and_increment_cache(DedupCacheEts, MsgId) of
- not_found -> read_message1(From, MsgLocation, State);
- Msg -> gen_server2:reply(From, {ok, Msg}),
- State
- end
+ not_found -> gen_server2:reply(From, not_found),
+ State;
+ MsgLocation -> read_message1(From, MsgLocation, State)
end.
-read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
- file = File, offset = Offset } = MsgLoc,
+read_message1(From, #msg_location { msg_id = MsgId, file = File,
+ offset = Offset } = MsgLoc,
State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }) ->
case File =:= CurFile of
true -> {Msg, State1} =
@@ -1010,10 +972,8 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
true -> file_handle_cache:flush(CurHdl);
false -> ok
end,
- read_from_disk(MsgLoc, State, DedupCacheEts);
+ read_from_disk(MsgLoc, State);
[{MsgId, Msg1, _CacheRefCount}] ->
- ok = maybe_insert_into_cache(
- DedupCacheEts, RefCount, MsgId, Msg1),
{Msg1, State}
end,
gen_server2:reply(From, {ok, Msg}),
@@ -1023,17 +983,14 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
case Locked of
true -> add_to_pending_gc_completion({read, MsgId, From},
File, State);
- false -> {Msg, State1} =
- read_from_disk(MsgLoc, State, DedupCacheEts),
+ false -> {Msg, State1} = read_from_disk(MsgLoc, State),
gen_server2:reply(From, {ok, Msg}),
State1
end
end.
-read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount,
- file = File, offset = Offset,
- total_size = TotalSize },
- State, DedupCacheEts) ->
+read_from_disk(#msg_location { msg_id = MsgId, file = File, offset = Offset,
+ total_size = TotalSize }, State) ->
{Hdl, State1} = get_read_handle(File, State),
{ok, Offset} = file_handle_cache:position(Hdl, Offset),
{ok, {MsgId, Msg}} =
@@ -1049,7 +1006,6 @@ read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount,
{proc_dict, get()}
]}}
end,
- ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg),
{Msg, State1}.
contains_message(MsgId, From,
@@ -1068,8 +1024,7 @@ contains_message(MsgId, From,
end.
remove_message(MsgId, CRef,
- State = #msstate { file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts }) ->
+ State = #msstate { file_summary_ets = FileSummaryEts }) ->
case should_mask_action(CRef, MsgId, State) of
{true, _Location} ->
State;
@@ -1090,8 +1045,7 @@ remove_message(MsgId, CRef,
%% don't remove from CUR_FILE_CACHE_ETS_NAME here
%% because there may be further writes in the mailbox
%% for the same msg.
- 1 -> ok = remove_cache_entry(DedupCacheEts, MsgId),
- case ets:lookup(FileSummaryEts, File) of
+ 1 -> case ets:lookup(FileSummaryEts, File) of
[#file_summary { locked = true }] ->
add_to_pending_gc_completion(
{remove, MsgId, CRef}, File, State);
@@ -1101,8 +1055,7 @@ remove_message(MsgId, CRef,
File, adjust_valid_total_size(File, -TotalSize,
State))
end;
- _ -> ok = decrement_cache(DedupCacheEts, MsgId),
- ok = Dec(),
+ _ -> ok = Dec(),
State
end
end.
@@ -1325,12 +1278,6 @@ list_sorted_file_names(Dir, Ext) ->
%% message cache helper functions
%%----------------------------------------------------------------------------
-maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg)
- when RefCount > 1 ->
- update_msg_cache(DedupCacheEts, MsgId, Msg);
-maybe_insert_into_cache(_DedupCacheEts, _RefCount, _MsgId, _Msg) ->
- ok.
-
update_msg_cache(CacheEts, MsgId, Msg) ->
case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of
true -> ok;
@@ -1339,34 +1286,6 @@ update_msg_cache(CacheEts, MsgId, Msg) ->
fun () -> update_msg_cache(CacheEts, MsgId, Msg) end)
end.
-remove_cache_entry(DedupCacheEts, MsgId) ->
- true = ets:delete(DedupCacheEts, MsgId),
- ok.
-
-fetch_and_increment_cache(DedupCacheEts, MsgId) ->
- case ets:lookup(DedupCacheEts, MsgId) of
- [] ->
- not_found;
- [{_MsgId, Msg, _RefCount}] ->
- safe_ets_update_counter_ok(
- DedupCacheEts, MsgId, {3, +1},
- %% someone has deleted us in the meantime, insert us
- fun () -> ok = update_msg_cache(DedupCacheEts, MsgId, Msg) end),
- Msg
- end.
-
-decrement_cache(DedupCacheEts, MsgId) ->
- true = safe_ets_update_counter(
- DedupCacheEts, MsgId, {3, -1},
- fun (N) when N =< 0 -> true = ets:delete(DedupCacheEts, MsgId);
- (_N) -> true
- end,
- %% MsgId is not in there because although it's been
- %% delivered, it's never actually been read (think:
- %% persistent message held in RAM)
- fun () -> true end),
- ok.
-
%%----------------------------------------------------------------------------
%% index
%%----------------------------------------------------------------------------
@@ -1592,8 +1511,8 @@ build_index(Gatherer, Left, [],
sum_file_size = SumFileSize }) ->
case gatherer:out(Gatherer) of
empty ->
+ unlink(Gatherer),
ok = gatherer:stop(Gatherer),
- ok = rabbit_misc:unlink_and_capture_exit(Gatherer),
ok = index_delete_by_file(undefined, State),
Offset = case ets:lookup(FileSummaryEts, Left) of
[] -> 0;
@@ -1972,7 +1891,10 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
force_recovery(BaseDir, Store) ->
Dir = filename:join(BaseDir, atom_to_list(Store)),
- ok = file:delete(filename:join(Dir, ?CLEAN_FILENAME)),
+ case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
+ ok -> ok;
+ {error, enoent} -> ok
+ end,
recover_crashed_compactions(BaseDir),
ok.
@@ -2007,7 +1929,10 @@ transform_msg_file(FileOld, FileNew, TransformFun) ->
rabbit_msg_file:scan(
RefOld, filelib:file_size(FileOld),
fun({MsgId, _Size, _Offset, BinMsg}, ok) ->
- {ok, MsgNew} = TransformFun(binary_to_term(BinMsg)),
+ {ok, MsgNew} = case binary_to_term(BinMsg) of
+ <<>> -> {ok, <<>>}; %% dying client marker
+ Msg -> TransformFun(Msg)
+ end,
{ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew),
ok
end, ok),
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 8800e8d6..79deb46c 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -258,8 +258,13 @@ duplicate_node_check(NodeStr) ->
terminate(?ERROR_CODE);
false -> ok
end;
- {error, EpmdReason} -> terminate("unexpected epmd error: ~p~n",
- [EpmdReason])
+ {error, EpmdReason} ->
+ terminate("epmd error for host ~p: ~p (~s)~n",
+ [NodeHost, EpmdReason,
+ case EpmdReason of
+ address -> "unable to establish tcp connection";
+ _ -> inet:format_error(EpmdReason)
+ end])
end.
terminate(Fmt, Args) ->
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 367953b8..aaf3df78 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -514,8 +514,8 @@ queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) ->
queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
case gatherer:out(Gatherer) of
empty ->
+ unlink(Gatherer),
ok = gatherer:stop(Gatherer),
- ok = rabbit_misc:unlink_and_capture_exit(Gatherer),
finished;
{value, {MsgId, Count}} ->
{MsgId, Count, {next, Gatherer}}
@@ -1036,8 +1036,8 @@ foreach_queue_index(Funs) ->
end)
end || QueueDirName <- QueueDirNames],
empty = gatherer:out(Gatherer),
- ok = gatherer:stop(Gatherer),
- ok = rabbit_misc:unlink_and_capture_exit(Gatherer).
+ unlink(Gatherer),
+ ok = gatherer:stop(Gatherer).
transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 609bb43f..42af91a8 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -681,7 +681,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
State#v1{connection_state = running,
connection = NewConnection}),
rabbit_event:notify(connection_created,
- infos(?CREATION_EVENT_KEYS, State1)),
+ [{type, network} |
+ infos(?CREATION_EVENT_KEYS, State1)]),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State1) end),
State1;
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index 1953b6b8..e0defa9e 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -89,8 +89,8 @@ find_by_type(Type, {rdnSequence, RDNs}) ->
case [V || #'AttributeTypeAndValue'{type = T, value = V}
<- lists:flatten(RDNs),
T == Type] of
- [{printableString, S}] -> S;
- [] -> not_found
+ [Val] -> format_asn1_value(Val);
+ [] -> not_found
end.
%%--------------------------------------------------------------------------
@@ -162,12 +162,85 @@ escape_rdn_value([C | S], middle) ->
format_asn1_value({ST, S}) when ST =:= teletexString; ST =:= printableString;
ST =:= universalString; ST =:= utf8String;
ST =:= bmpString ->
- if is_binary(S) -> binary_to_list(S);
- true -> S
- end;
+ format_directory_string(ST, S);
format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2,
Min1, Min2, S1, S2, $Z]}) ->
io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ",
[Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]);
format_asn1_value(V) ->
io_lib:format("~p", [V]).
+
+%% DirectoryString { INTEGER : maxSize } ::= CHOICE {
+%% teletexString TeletexString (SIZE (1..maxSize)),
+%% printableString PrintableString (SIZE (1..maxSize)),
+%% bmpString BMPString (SIZE (1..maxSize)),
+%% universalString UniversalString (SIZE (1..maxSize)),
+%% uTF8String UTF8String (SIZE (1..maxSize)) }
+%%
+%% Precise definitions of printable / teletexString are hard to come
+%% by. This is what I reconstructed:
+%%
+%% printableString:
+%% "intended to represent the limited character sets available to
+%% mainframe input terminals"
+%% A-Z a-z 0-9 ' ( ) + , - . / : = ? [space]
+%% http://msdn.microsoft.com/en-us/library/bb540814(v=vs.85).aspx
+%%
+%% teletexString:
+%% "a sizable volume of software in the world treats TeletexString
+%% (T61String) as a simple 8-bit string with mostly Windows Latin 1
+%% (superset of iso-8859-1) encoding"
+%% http://www.mail-archive.com/asn1@asn1.org/msg00460.html
+%%
+%% (However according to that link X.680 actually defines
+%% TeletexString in some much more involved and crazy way. I suggest
+%% we treat it as ISO-8859-1 since Erlang does not support Windows
+%% Latin 1).
+%%
+%% bmpString:
+%% UCS-2 according to RFC 3641. Hence cannot represent Unicode
+%% characters above 65535 (outside the "Basic Multilingual Plane").
+%%
+%% universalString:
+%% UCS-4 according to RFC 3641.
+%%
+%% utf8String:
+%% UTF-8 according to RFC 3641.
+%%
+%% Within Rabbit we assume UTF-8 encoding. Since printableString is a
+%% subset of ASCII it is also a subset of UTF-8. The others need
+%% converting. Fortunately since the Erlang SSL library does the
+%% decoding for us (albeit into a weird format, see below), we just
+%% need to handle encoding into UTF-8. Note also that utf8Strings come
+%% back as binary.
+%%
+%% Note for testing: the default Ubuntu configuration for openssl will
+%% only create printableString or teletexString types no matter what
+%% you do. Edit string_mask in the [req] section of
+%% /etc/ssl/openssl.cnf to change this (see comments there). You
+%% probably also need to set utf8 = yes to get it to accept UTF-8 on
+%% the command line. Also note I could not get openssl to generate a
+%% universalString.
+
+format_directory_string(printableString, S) -> S;
+format_directory_string(teletexString, S) -> utf8_list_from(S);
+format_directory_string(bmpString, S) -> utf8_list_from(S);
+format_directory_string(universalString, S) -> utf8_list_from(S);
+format_directory_string(utf8String, S) -> binary_to_list(S).
+
+utf8_list_from(S) ->
+ binary_to_list(
+ unicode:characters_to_binary(flatten_ssl_list(S), utf32, utf8)).
+
+%% The Erlang SSL implementation invents its own representation for
+%% non-ascii strings - looking like [97,{0,0,3,187}] (that's LATIN
+%% SMALL LETTER A followed by GREEK SMALL LETTER LAMDA). We convert
+%% this into a list of unicode characters, which we can tell
+%% unicode:characters_to_binary is utf32.
+
+flatten_ssl_list(L) -> [flatten_ssl_list_item(I) || I <- L].
+
+flatten_ssl_list_item({A, B, C, D}) ->
+ A * (1 bsl 24) + B * (1 bsl 16) + C * (1 bsl 8) + D;
+flatten_ssl_list_item(N) when is_number (N) ->
+ N.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c029412d..38492984 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1786,8 +1786,6 @@ test_msg_store() ->
true = msg_store_contains(true, MsgIds2ndHalf, MSCState2),
%% read the second half again
MSCState3 = msg_store_read(MsgIds2ndHalf, MSCState2),
- %% release the second half, just for fun (aka code coverage)
- ok = rabbit_msg_store:release(MsgIds2ndHalf, MSCState3),
%% read the second half again, just for fun (aka code coverage)
MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3),
ok = rabbit_msg_store:client_terminate(MSCState4),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7a1102e5..ff7252fd 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -728,7 +728,7 @@ requeue(AckTags, MsgPropsFun, State) ->
needs_confirming = false }
end,
a(reduce_memory_use(
- ack(fun msg_store_release/3,
+ ack(fun (_, _, _) -> ok end,
fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
{_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps),
true, false, State1),
@@ -972,11 +972,6 @@ msg_store_remove(MSCState, IsPersistent, MsgIds) ->
MSCState, IsPersistent,
fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end).
-msg_store_release(MSCState, IsPersistent, MsgIds) ->
- with_immutable_msg_store_state(
- MSCState, IsPersistent,
- fun (MCSState1) -> rabbit_msg_store:release(MsgIds, MCSState1) end).
-
msg_store_sync(MSCState, IsPersistent, MsgIds, Fun) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 1a240856..ec1ee9cd 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -38,6 +38,9 @@
%% child is a supervisor and it exits normally (i.e. with reason of
%% 'shutdown') then the child's parent also exits normally.
%%
+%% 5) normal, and {shutdown, _} exit reasons are all treated the same
+%% (i.e. are regarded as normal exits)
+%%
%% All modifications are (C) 2010-2011 VMware, Inc.
%%
%% %CopyrightBegin%
@@ -544,17 +547,12 @@ do_restart({RestartType, Delay}, Reason, Child, State) ->
do_restart(permanent, Reason, Child, State) ->
report_error(child_terminated, Reason, Child, State#state.name),
restart(Child, State);
-do_restart(intrinsic, normal, Child, State) ->
- {shutdown, state_del_child(Child, State)};
-do_restart(intrinsic, shutdown, Child = #child{child_type = supervisor},
- State) ->
- {shutdown, state_del_child(Child, State)};
-do_restart(_, normal, Child, State) ->
- NState = state_del_child(Child, State),
- {ok, NState};
-do_restart(_, shutdown, Child, State) ->
- NState = state_del_child(Child, State),
- {ok, NState};
+do_restart(Type, normal, Child, State) ->
+ del_child_and_maybe_shutdown(Type, Child, State);
+do_restart(Type, {shutdown, _}, Child, State) ->
+ del_child_and_maybe_shutdown(Type, Child, State);
+do_restart(Type, shutdown, Child = #child{child_type = supervisor}, State) ->
+ del_child_and_maybe_shutdown(Type, Child, State);
do_restart(Type, Reason, Child, State) when Type =:= transient orelse
Type =:= intrinsic ->
report_error(child_terminated, Reason, Child, State#state.name),
@@ -564,6 +562,11 @@ do_restart(temporary, Reason, Child, State) ->
NState = state_del_child(Child, State),
{ok, NState}.
+del_child_and_maybe_shutdown(intrinsic, Child, State) ->
+ {shutdown, state_del_child(Child, State)};
+del_child_and_maybe_shutdown(_, Child, State) ->
+ {ok, state_del_child(Child, State)}.
+
restart(Child, State) ->
case add_restart(State) of
{ok, NState} ->
diff --git a/src/test_sup.erl b/src/test_sup.erl
index b4df1fd0..150235da 100644
--- a/src/test_sup.erl
+++ b/src/test_sup.erl
@@ -45,8 +45,8 @@ test_supervisor_delayed_restart(SupPid) ->
with_sup(RestartStrategy, Fun) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, [RestartStrategy]),
Res = Fun(SupPid),
+ unlink(SupPid),
exit(SupPid, shutdown),
- rabbit_misc:unlink_and_capture_exit(SupPid),
Res.
init([RestartStrategy]) ->