summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-03-23 16:06:15 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-03-23 16:06:15 +0000
commitfb7e672093aa37b0c0e65347dc65875584e76070 (patch)
treefac1bcfe14b8984452c413bc747f954637eb6906
parentc63dcaa034093cd1dc217c06c102127d18ac524f (diff)
parent02a4098c915add7c5f9b9002cf5ff0d6783e091d (diff)
downloadrabbitmq-server-fb7e672093aa37b0c0e65347dc65875584e76070.tar.gz
Merge heads
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--packaging/debs/Debian/debian/control5
-rw-r--r--src/file_handle_cache.erl16
-rw-r--r--src/rabbit.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl80
-rw-r--r--src/rabbit_binding.erl23
-rw-r--r--src/rabbit_mnesia.erl25
-rw-r--r--src/rabbit_msg_store.erl48
-rw-r--r--src/rabbit_prelaunch.erl4
-rw-r--r--src/rabbit_tests.erl47
-rw-r--r--src/rabbit_upgrade.erl55
-rw-r--r--src/rabbit_variable_queue.erl11
13 files changed, 185 insertions, 141 deletions
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index ae9b2059..45af770a 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -120,6 +120,9 @@ done
rm -rf %{buildroot}
%changelog
+* Tue Mar 22 2011 Alexandru Scvortov <alexandru@rabbitmq.com> 2.4.0-1
+- New Upstream Release
+
* Thu Feb 3 2011 simon@rabbitmq.com 2.3.1-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 12165dc0..2ca5074f 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (2.4.0-1) lucid; urgency=low
+
+ * New Upstream Release
+
+ -- Alexandru Scvortov <alexandru@rabbitmq.com> Tue, 22 Mar 2011 17:34:31 +0000
+
rabbitmq-server (2.3.1-1) lucid; urgency=low
* New Upstream Release
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index 02da0cc6..45f5c5c4 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -7,10 +7,7 @@ Standards-Version: 3.8.0
Package: rabbitmq-server
Architecture: all
-# erlang-inets is not a strict dependency, but it's needed to allow
-# the installation of plugins that use mochiweb. Ideally it would be a
-# "Recommends" instead, but gdebi does not install those.
-Depends: erlang-base (>= 1:12.b.3) | erlang-base-hipe (>= 1:12.b.3), erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), erlang-inets | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends}
+Depends: erlang-nox (>= 1:12.b.3), adduser, logrotate, ${misc:Depends}
Description: An AMQP server written in Erlang
RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 4f036571..61b08d49 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -970,12 +970,13 @@ queue_fold(Fun, Init, Q) ->
filter_pending(Fun, {Count, Queue}) ->
{Delta, Queue1} =
- queue_fold(fun (Item, {DeltaN, QueueN}) ->
- case Fun(Item) of
- true -> {DeltaN, queue:in(Item, QueueN)};
- false -> {DeltaN - requested(Item), QueueN}
- end
- end, {0, queue:new()}, Queue),
+ queue_fold(
+ fun (Item = #pending { requested = Requested }, {DeltaN, QueueN}) ->
+ case Fun(Item) of
+ true -> {DeltaN, queue:in(Item, QueueN)};
+ false -> {DeltaN - Requested, QueueN}
+ end
+ end, {0, queue:new()}, Queue),
{Count + Delta, Queue1}.
pending_new() ->
@@ -1021,9 +1022,6 @@ adjust_alarm(OldState, NewState) ->
end,
NewState.
-requested({_Kind, _Pid, Requested, _From}) ->
- Requested.
-
process_pending(State = #fhc_state { limit = infinity }) ->
State;
process_pending(State) ->
diff --git a/src/rabbit.erl b/src/rabbit.erl
index e60886fa..807e9e7d 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -192,7 +192,8 @@
%%----------------------------------------------------------------------------
prepare() ->
- ok = ensure_working_log_handlers().
+ ok = ensure_working_log_handlers(),
+ ok = rabbit_upgrade:maybe_upgrade_mnesia().
start() ->
try
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7c4b5190..3f5758ce 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -439,19 +439,26 @@ gb_trees_cons(Key, Value, Tree) ->
none -> gb_trees:insert(Key, [Value], Tree)
end.
-record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
- {never, State};
-record_confirm_message(#delivery{sender = ChPid,
+should_confirm_message(#delivery{msg_seq_no = undefined}, _State) ->
+ never;
+should_confirm_message(#delivery{sender = ChPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
is_persistent = true,
id = MsgId}},
- State = #q{q = #amqqueue{durable = true},
- msg_id_to_channel = MTC}) ->
- {eventually,
- State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)}};
-record_confirm_message(_Delivery, State) ->
- {immediately, State}.
+ #q{q = #amqqueue{durable = true}}) ->
+ {eventually, ChPid, MsgSeqNo, MsgId};
+should_confirm_message(_Delivery, _State) ->
+ immediately.
+
+needs_confirming({eventually, _, _, _}) -> true;
+needs_confirming(_) -> false.
+
+maybe_record_confirm_message({eventually, ChPid, MsgSeqNo, MsgId},
+ State = #q{msg_id_to_channel = MTC}) ->
+ State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)};
+maybe_record_confirm_message(_Confirm, State) ->
+ State.
run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
@@ -465,9 +472,10 @@ run_message_queue(State) ->
attempt_delivery(#delivery{txn = none,
sender = ChPid,
message = Message,
- msg_seq_no = MsgSeqNo},
- {NeedsConfirming, State = #q{backing_queue = BQ}}) ->
- case NeedsConfirming of
+ msg_seq_no = MsgSeqNo} = Delivery,
+ State = #q{backing_queue = BQ}) ->
+ Confirm = should_confirm_message(Delivery, State),
+ case Confirm of
immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
_ -> ok
end,
@@ -481,36 +489,36 @@ attempt_delivery(#delivery{txn = none,
BQ:publish_delivered(
AckRequired, Message,
(?BASE_MESSAGE_PROPERTIES)#message_properties{
- needs_confirming = (NeedsConfirming =:= eventually)},
+ needs_confirming = needs_confirming(Confirm)},
BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
{Delivered, State1} =
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State),
- {Delivered, NeedsConfirming, State1};
+ {Delivered, Confirm, State1};
attempt_delivery(#delivery{txn = Txn,
sender = ChPid,
- message = Message},
- {NeedsConfirming, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}}) ->
+ message = Message} = Delivery,
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS),
- {true, NeedsConfirming, State#q{backing_queue_state = BQS1}}.
-
-deliver_or_enqueue(Delivery, State) ->
- case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
- {true, _, State1} ->
- State1;
- {false, NeedsConfirming, State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}} ->
- #delivery{message = Message} = Delivery,
- BQS1 = BQ:publish(Message,
- (message_properties(State)) #message_properties{
- needs_confirming =
- (NeedsConfirming =:= eventually)},
- BQS),
- ensure_ttl_timer(State1#q{backing_queue_state = BQS1})
+ {true, should_confirm_message(Delivery, State),
+ State#q{backing_queue_state = BQS1}}.
+
+deliver_or_enqueue(Delivery = #delivery{message = Message}, State) ->
+ {Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
+ State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ maybe_record_confirm_message(Confirm, State1),
+ case Delivered of
+ true -> State2;
+ false -> BQS1 =
+ BQ:publish(Message,
+ (message_properties(State)) #message_properties{
+ needs_confirming = needs_confirming(Confirm)},
+ BQS),
+ ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
@@ -829,9 +837,11 @@ handle_call({deliver_immediately, Delivery}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, _NeedsConfirming, State1} =
- attempt_delivery(Delivery, record_confirm_message(Delivery, State)),
- reply(Delivered, State1);
+ {Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
+ reply(Delivered, case Delivered of
+ true -> maybe_record_confirm_message(Confirm, State1);
+ false -> State1
+ end);
handle_call({deliver, Delivery}, From, State) ->
%% Synchronous, "mandatory" delivery mode. Reply asap.
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 7ddb7814..6167790e 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -331,17 +331,18 @@ group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) ->
group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed).
maybe_auto_delete(XName, Bindings, Deletions) ->
- case mnesia:read({rabbit_exchange, XName}) of
- [] ->
- add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions);
- [X] ->
- add_deletion(XName, {X, not_deleted, Bindings},
- case rabbit_exchange:maybe_auto_delete(X) of
- not_deleted -> Deletions;
- {deleted, Deletions1} -> combine_deletions(
- Deletions, Deletions1)
- end)
- end.
+ {Entry, Deletions1} =
+ case mnesia:read({rabbit_exchange, XName}) of
+ [] -> {{undefined, not_deleted, Bindings}, Deletions};
+ [X] -> case rabbit_exchange:maybe_auto_delete(X) of
+ not_deleted ->
+ {{X, not_deleted, Bindings}, Deletions};
+ {deleted, Deletions2} ->
+ {{X, deleted, Bindings},
+ combine_deletions(Deletions, Deletions2)}
+ end
+ end,
+ add_deletion(XName, Entry, Deletions1).
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index e661e5e3..9ca52327 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -88,8 +88,8 @@ status() ->
{running_nodes, running_clustered_nodes()}].
init() ->
- ok = ensure_mnesia_running(),
- ok = ensure_mnesia_dir(),
+ ensure_mnesia_running(),
+ ensure_mnesia_dir(),
ok = init_db(read_cluster_nodes_config(), true, true),
ok.
@@ -108,8 +108,8 @@ force_cluster(ClusterNodes) ->
%% node. If Force is false, only connections to online nodes are
%% allowed.
cluster(ClusterNodes, Force) ->
- ok = ensure_mnesia_not_running(),
- ok = ensure_mnesia_dir(),
+ ensure_mnesia_not_running(),
+ ensure_mnesia_dir(),
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
try
ok = init_db(ClusterNodes, Force, true),
@@ -434,8 +434,9 @@ init_db(ClusterNodes, Force, DoSecondaryLocalUpgrades) ->
%% We're the first node up
case rabbit_upgrade:maybe_upgrade_local() of
ok -> ensure_schema_integrity();
- version_not_available -> schema_ok_or_move()
- end;
+ version_not_available -> ok = schema_ok_or_move()
+ end,
+ ok;
{[AnotherNode|_], _} ->
%% Subsequent node in cluster, catch up
ensure_version_ok(
@@ -459,7 +460,8 @@ init_db(ClusterNodes, Force, DoSecondaryLocalUpgrades) ->
end;
false -> ok
end,
- ensure_schema_integrity()
+ ensure_schema_integrity(),
+ ok
end;
{error, Reason} ->
%% one reason we may end up here is if we try to join
@@ -499,7 +501,7 @@ create_schema() ->
rabbit_misc:ensure_ok(mnesia:start(),
cannot_start_mnesia),
ok = create_tables(),
- ok = ensure_schema_integrity(),
+ ensure_schema_integrity(),
ok = rabbit_version:record_desired().
move_db() ->
@@ -520,11 +522,12 @@ move_db() ->
{error, Reason} -> throw({error, {cannot_backup_mnesia,
MnesiaDir, BackupDir, Reason}})
end,
- ok = ensure_mnesia_dir(),
+ ensure_mnesia_dir(),
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
ok.
copy_db(Destination) ->
+ ok = ensure_mnesia_not_running(),
rabbit_misc:recursive_copy(dir(), Destination).
create_tables() ->
@@ -599,12 +602,12 @@ wait_for_tables(TableNames) ->
end.
reset(Force) ->
- ok = ensure_mnesia_not_running(),
+ ensure_mnesia_not_running(),
Node = node(),
case Force of
true -> ok;
false ->
- ok = ensure_mnesia_dir(),
+ ensure_mnesia_dir(),
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
{Nodes, RunningNodes} =
try
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index a08bbd70..bb26de64 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -850,16 +850,16 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
State1 = case CurHdl of
undefined -> State;
_ -> State2 = internal_sync(State),
- file_handle_cache:close(CurHdl),
+ ok = file_handle_cache:close(CurHdl),
State2
end,
State3 = close_all_handles(State1),
- store_file_summary(FileSummaryEts, Dir),
- [ets:delete(T) ||
+ ok = store_file_summary(FileSummaryEts, Dir),
+ [true = ets:delete(T) ||
T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]],
IndexModule:terminate(IndexState),
- store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
- {index_module, IndexModule}], Dir),
+ ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
+ {index_module, IndexModule}], Dir),
State3 #msstate { index_state = undefined,
current_file_handle = undefined }.
@@ -912,13 +912,16 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
false -> [{CRef, MsgIds} | NS]
end
end, [], CTM),
- case {Syncs, CGs} of
- {[], []} -> ok;
- _ -> file_handle_cache:sync(CurHdl)
- end,
+ ok = case {Syncs, CGs} of
+ {[], []} -> ok;
+ _ -> file_handle_cache:sync(CurHdl)
+ end,
[K() || K <- lists:reverse(Syncs)],
- [client_confirm(CRef, MsgIds, written, State1) || {CRef, MsgIds} <- CGs],
- State1 #msstate { cref_to_msg_ids = dict:new(), on_sync = [] }.
+ State2 = lists:foldl(
+ fun ({CRef, MsgIds}, StateN) ->
+ client_confirm(CRef, MsgIds, written, StateN)
+ end, State1, CGs),
+ State2 #msstate { on_sync = [] }.
write_action({true, not_found}, _MsgId, State) ->
{ignore, undefined, State};
@@ -1147,7 +1150,7 @@ orddict_store(Key, Val, Dict) ->
orddict:store(Key, Val, Dict).
update_pending_confirms(Fun, CRef,
- State = #msstate { clients = Clients,
+ State = #msstate { clients = Clients,
cref_to_msg_ids = CTM }) ->
case dict:fetch(CRef, Clients) of
{undefined, _CloseFDsFun} -> State;
@@ -1466,7 +1469,7 @@ recover_file_summary(false, _Dir) ->
recover_file_summary(true, Dir) ->
Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME),
case ets:file2tab(Path) of
- {ok, Tid} -> file:delete(Path),
+ {ok, Tid} -> ok = file:delete(Path),
{true, Tid};
{error, _Error} -> recover_file_summary(false, Dir)
end.
@@ -1533,9 +1536,7 @@ scan_file_for_valid_messages(Dir, FileName) ->
Hdl, filelib:file_size(
form_filename(Dir, FileName)),
fun scan_fun/2, []),
- %% if something really bad has happened,
- %% the close could fail, but ignore
- file_handle_cache:close(Hdl),
+ ok = file_handle_cache:close(Hdl),
Valid;
{error, enoent} -> {ok, [], 0};
{error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
@@ -1971,32 +1972,33 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
force_recovery(BaseDir, Store) ->
Dir = filename:join(BaseDir, atom_to_list(Store)),
- file:delete(filename:join(Dir, ?CLEAN_FILENAME)),
+ ok = file:delete(filename:join(Dir, ?CLEAN_FILENAME)),
recover_crashed_compactions(BaseDir),
ok.
foreach_file(D, Fun, Files) ->
- [Fun(filename:join(D, File)) || File <- Files].
+ [ok = Fun(filename:join(D, File)) || File <- Files].
foreach_file(D1, D2, Fun, Files) ->
- [Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files].
+ [ok = Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files].
transform_dir(BaseDir, Store, TransformFun) ->
Dir = filename:join(BaseDir, atom_to_list(Store)),
TmpDir = filename:join(Dir, ?TRANSFORM_TMP),
TransformFile = fun (A, B) -> transform_msg_file(A, B, TransformFun) end,
+ CopyFile = fun (Src, Dst) -> {ok, _Bytes} = file:copy(Src, Dst), ok end,
case filelib:is_dir(TmpDir) of
true -> throw({error, transform_failed_previously});
false -> FileList = list_sorted_file_names(Dir, ?FILE_EXTENSION),
foreach_file(Dir, TmpDir, TransformFile, FileList),
foreach_file(Dir, fun file:delete/1, FileList),
- foreach_file(TmpDir, Dir, fun file:copy/2, FileList),
+ foreach_file(TmpDir, Dir, CopyFile, FileList),
foreach_file(TmpDir, fun file:delete/1, FileList),
ok = file:del_dir(TmpDir)
end.
transform_msg_file(FileOld, FileNew, TransformFun) ->
- rabbit_misc:ensure_parent_dirs_exist(FileNew),
+ ok = rabbit_misc:ensure_parent_dirs_exist(FileNew),
{ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []),
{ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write],
[{write_buffer,
@@ -2009,6 +2011,6 @@ transform_msg_file(FileOld, FileNew, TransformFun) ->
{ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew),
ok
end, ok),
- file_handle_cache:close(RefOld),
- file_handle_cache:close(RefNew),
+ ok = file_handle_cache:close(RefOld),
+ ok = file_handle_cache:close(RefNew),
ok.
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 92ad6a24..8800e8d6 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -235,10 +235,8 @@ post_process_script(ScriptFile) ->
{error, {failed_to_load_script, Reason}}
end.
-process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) ->
- [{apply,{rabbit,prepare,[]}}, Entry];
process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) ->
- [{apply,{rabbit_upgrade,maybe_upgrade_mnesia,[]}}, Entry];
+ [{apply,{rabbit,prepare,[]}}, Entry];
process_entry(Entry) ->
[Entry].
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b8c3f4a9..ca046c91 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1630,23 +1630,42 @@ test_file_handle_cache() ->
ok = file_handle_cache:set_limit(5), %% 1 or 2 sockets, 2 msg_stores
TmpDir = filename:join(rabbit_mnesia:dir(), "tmp"),
ok = filelib:ensure_dir(filename:join(TmpDir, "nothing")),
+ [Src1, Dst1, Src2, Dst2] = Files =
+ [filename:join(TmpDir, Str) || Str <- ["file1", "file2", "file3", "file4"]],
+ Content = <<"foo">>,
+ CopyFun = fun (Src, Dst) ->
+ ok = file:write_file(Src, Content),
+ {ok, SrcHdl} = file_handle_cache:open(Src, [read], []),
+ {ok, DstHdl} = file_handle_cache:open(Dst, [write], []),
+ Size = size(Content),
+ {ok, Size} = file_handle_cache:copy(SrcHdl, DstHdl, Size),
+ ok = file_handle_cache:delete(SrcHdl),
+ ok = file_handle_cache:delete(DstHdl)
+ end,
Pid = spawn(fun () -> {ok, Hdl} = file_handle_cache:open(
- filename:join(TmpDir, "file3"),
+ filename:join(TmpDir, "file5"),
[write], []),
- receive close -> ok end,
- file_handle_cache:delete(Hdl)
+ receive {next, Pid1} -> Pid1 ! {next, self()} end,
+ file_handle_cache:delete(Hdl),
+ %% This will block and never return, so we
+ %% exercise the fhc tidying up the pending
+ %% queue on the death of a process.
+ ok = CopyFun(Src1, Dst1)
end),
- Src = filename:join(TmpDir, "file1"),
- Dst = filename:join(TmpDir, "file2"),
- Content = <<"foo">>,
- ok = file:write_file(Src, Content),
- {ok, SrcHdl} = file_handle_cache:open(Src, [read], []),
- {ok, DstHdl} = file_handle_cache:open(Dst, [write], []),
- Size = size(Content),
- {ok, Size} = file_handle_cache:copy(SrcHdl, DstHdl, Size),
- ok = file_handle_cache:delete(SrcHdl),
- file_handle_cache:delete(DstHdl),
- Pid ! close,
+ ok = CopyFun(Src1, Dst1),
+ ok = file_handle_cache:set_limit(2),
+ Pid ! {next, self()},
+ receive {next, Pid} -> ok end,
+ timer:sleep(100),
+ Pid1 = spawn(fun () -> CopyFun(Src2, Dst2) end),
+ timer:sleep(100),
+ erlang:monitor(process, Pid),
+ erlang:monitor(process, Pid1),
+ exit(Pid, kill),
+ exit(Pid1, kill),
+ receive {'DOWN', _MRef, process, Pid, _Reason} -> ok end,
+ receive {'DOWN', _MRef1, process, Pid1, _Reason1} -> ok end,
+ [file:delete(File) || File <- Files],
ok = file_handle_cache:set_limit(Limit),
passed.
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 244be522..3981b173 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -91,7 +91,7 @@
%% -------------------------------------------------------------------
-ensure_backup() ->
+ensure_backup_taken() ->
case filelib:is_file(lock_filename()) of
false -> case filelib:is_dir(backup_dir()) of
false -> ok = take_backup();
@@ -101,7 +101,6 @@ ensure_backup() ->
end.
take_backup() ->
- rabbit:prepare(), %% Ensure we have logs for this
BackupDir = backup_dir(),
case rabbit_mnesia:copy_db(BackupDir) of
ok -> info("upgrades: Mnesia dir backed up to ~p~n",
@@ -109,7 +108,7 @@ take_backup() ->
{error, E} -> throw({could_not_back_up_mnesia_dir, E})
end.
-maybe_remove_backup() ->
+ensure_backup_removed() ->
case filelib:is_dir(backup_dir()) of
true -> ok = remove_backup();
_ -> ok
@@ -134,11 +133,11 @@ maybe_upgrade_mnesia() ->
{ok, []} ->
ok;
{ok, Upgrades} ->
- rabbit:prepare(), %% Ensure we have logs for this
- case upgrade_mode(AllNodes) of
- primary -> primary_upgrade(Upgrades, AllNodes);
- secondary -> secondary_upgrade(AllNodes)
- end
+ ensure_backup_taken(),
+ ok = case upgrade_mode(AllNodes) of
+ primary -> primary_upgrade(Upgrades, AllNodes);
+ secondary -> secondary_upgrade(AllNodes)
+ end
end.
upgrade_mode(AllNodes) ->
@@ -200,30 +199,32 @@ die(Msg, Args) ->
primary_upgrade(Upgrades, Nodes) ->
Others = Nodes -- [node()],
- apply_upgrades(
- mnesia,
- Upgrades,
- fun () ->
- force_tables(),
- case Others of
- [] -> ok;
- _ -> info("mnesia upgrades: Breaking cluster~n", []),
- [{atomic, ok} = mnesia:del_table_copy(schema, Node)
- || Node <- Others]
- end
- end),
+ ok = apply_upgrades(
+ mnesia,
+ Upgrades,
+ fun () ->
+ force_tables(),
+ case Others of
+ [] -> ok;
+ _ -> info("mnesia upgrades: Breaking cluster~n", []),
+ [{atomic, ok} = mnesia:del_table_copy(schema, Node)
+ || Node <- Others]
+ end
+ end),
ok.
force_tables() ->
[mnesia:force_load_table(T) || T <- rabbit_mnesia:table_names()].
secondary_upgrade(AllNodes) ->
+ %% must do this before we wipe out schema
+ IsDiscNode = is_disc_node(),
rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
cannot_delete_schema),
%% Note that we cluster with all nodes, rather than all disc nodes
%% (as we can't know all disc nodes at this point). This is safe as
%% we're not writing the cluster config, just setting up Mnesia.
- ClusterNodes = case is_disc_node() of
+ ClusterNodes = case IsDiscNode of
true -> AllNodes;
false -> AllNodes -- [node()]
end,
@@ -247,17 +248,19 @@ maybe_upgrade_local() ->
case rabbit_version:upgrades_required(local) of
{error, version_not_available} -> version_not_available;
{error, _} = Err -> throw(Err);
- {ok, []} -> maybe_remove_backup();
+ {ok, []} -> ensure_backup_removed(),
+ ok;
{ok, Upgrades} -> mnesia:stop(),
- apply_upgrades(local, Upgrades,
- fun () -> ok end),
- maybe_remove_backup()
+ ensure_backup_taken(),
+ ok = apply_upgrades(local, Upgrades,
+ fun () -> ok end),
+ ensure_backup_removed(),
+ ok
end.
%% -------------------------------------------------------------------
apply_upgrades(Scope, Upgrades, Fun) ->
- ensure_backup(),
ok = rabbit_misc:lock_file(lock_filename()),
info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]),
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 6e3460c5..7a1102e5 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -150,10 +150,13 @@
%% responsive.
%%
%% In the queue we keep track of both messages that are pending
-%% delivery and messages that are pending acks. This ensures that
-%% purging (deleting the former) and deletion (deleting the former and
-%% the latter) are both cheap and do require any scanning through qi
-%% segments.
+%% delivery and messages that are pending acks. In the event of a
+%% queue purge, we only need to load qi segments if the queue has
+%% elements in deltas (i.e. it came under significant memory
+%% pressure). In the event of a queue deletion, in addition to the
+%% preceding, by keeping track of pending acks in RAM, we do not need
+%% to search through qi segments looking for messages that are yet to
+%% be acknowledged.
%%
%% Pending acks are recorded in memory either as the tuple {SeqId,
%% MsgId, MsgProps} (tuple-form) or as the message itself (message-