diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-03-23 16:06:15 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-03-23 16:06:15 +0000 |
commit | fb7e672093aa37b0c0e65347dc65875584e76070 (patch) | |
tree | fac1bcfe14b8984452c413bc747f954637eb6906 | |
parent | c63dcaa034093cd1dc217c06c102127d18ac524f (diff) | |
parent | 02a4098c915add7c5f9b9002cf5ff0d6783e091d (diff) | |
download | rabbitmq-server-fb7e672093aa37b0c0e65347dc65875584e76070.tar.gz |
Merge heads
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 3 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/changelog | 6 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/control | 5 | ||||
-rw-r--r-- | src/file_handle_cache.erl | 16 | ||||
-rw-r--r-- | src/rabbit.erl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 80 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 23 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 25 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 48 | ||||
-rw-r--r-- | src/rabbit_prelaunch.erl | 4 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 47 | ||||
-rw-r--r-- | src/rabbit_upgrade.erl | 55 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 11 |
13 files changed, 185 insertions, 141 deletions
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index ae9b2059..45af770a 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -120,6 +120,9 @@ done rm -rf %{buildroot} %changelog +* Tue Mar 22 2011 Alexandru Scvortov <alexandru@rabbitmq.com> 2.4.0-1 +- New Upstream Release + * Thu Feb 3 2011 simon@rabbitmq.com 2.3.1-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 12165dc0..2ca5074f 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (2.4.0-1) lucid; urgency=low + + * New Upstream Release + + -- Alexandru Scvortov <alexandru@rabbitmq.com> Tue, 22 Mar 2011 17:34:31 +0000 + rabbitmq-server (2.3.1-1) lucid; urgency=low * New Upstream Release diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index 02da0cc6..45f5c5c4 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -7,10 +7,7 @@ Standards-Version: 3.8.0 Package: rabbitmq-server Architecture: all -# erlang-inets is not a strict dependency, but it's needed to allow -# the installation of plugins that use mochiweb. Ideally it would be a -# "Recommends" instead, but gdebi does not install those. -Depends: erlang-base (>= 1:12.b.3) | erlang-base-hipe (>= 1:12.b.3), erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), erlang-inets | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends} +Depends: erlang-nox (>= 1:12.b.3), adduser, logrotate, ${misc:Depends} Description: An AMQP server written in Erlang RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 4f036571..61b08d49 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -970,12 +970,13 @@ queue_fold(Fun, Init, Q) -> filter_pending(Fun, {Count, Queue}) -> {Delta, Queue1} = - queue_fold(fun (Item, {DeltaN, QueueN}) -> - case Fun(Item) of - true -> {DeltaN, queue:in(Item, QueueN)}; - false -> {DeltaN - requested(Item), QueueN} - end - end, {0, queue:new()}, Queue), + queue_fold( + fun (Item = #pending { requested = Requested }, {DeltaN, QueueN}) -> + case Fun(Item) of + true -> {DeltaN, queue:in(Item, QueueN)}; + false -> {DeltaN - Requested, QueueN} + end + end, {0, queue:new()}, Queue), {Count + Delta, Queue1}. pending_new() -> @@ -1021,9 +1022,6 @@ adjust_alarm(OldState, NewState) -> end, NewState. -requested({_Kind, _Pid, Requested, _From}) -> - Requested. - process_pending(State = #fhc_state { limit = infinity }) -> State; process_pending(State) -> diff --git a/src/rabbit.erl b/src/rabbit.erl index e60886fa..807e9e7d 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -192,7 +192,8 @@ %%---------------------------------------------------------------------------- prepare() -> - ok = ensure_working_log_handlers(). + ok = ensure_working_log_handlers(), + ok = rabbit_upgrade:maybe_upgrade_mnesia(). start() -> try diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7c4b5190..3f5758ce 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -439,19 +439,26 @@ gb_trees_cons(Key, Value, Tree) -> none -> gb_trees:insert(Key, [Value], Tree) end. -record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> - {never, State}; -record_confirm_message(#delivery{sender = ChPid, +should_confirm_message(#delivery{msg_seq_no = undefined}, _State) -> + never; +should_confirm_message(#delivery{sender = ChPid, msg_seq_no = MsgSeqNo, message = #basic_message { is_persistent = true, id = MsgId}}, - State = #q{q = #amqqueue{durable = true}, - msg_id_to_channel = MTC}) -> - {eventually, - State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)}}; -record_confirm_message(_Delivery, State) -> - {immediately, State}. + #q{q = #amqqueue{durable = true}}) -> + {eventually, ChPid, MsgSeqNo, MsgId}; +should_confirm_message(_Delivery, _State) -> + immediately. + +needs_confirming({eventually, _, _, _}) -> true; +needs_confirming(_) -> false. + +maybe_record_confirm_message({eventually, ChPid, MsgSeqNo, MsgId}, + State = #q{msg_id_to_channel = MTC}) -> + State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)}; +maybe_record_confirm_message(_Confirm, State) -> + State. run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, @@ -465,9 +472,10 @@ run_message_queue(State) -> attempt_delivery(#delivery{txn = none, sender = ChPid, message = Message, - msg_seq_no = MsgSeqNo}, - {NeedsConfirming, State = #q{backing_queue = BQ}}) -> - case NeedsConfirming of + msg_seq_no = MsgSeqNo} = Delivery, + State = #q{backing_queue = BQ}) -> + Confirm = should_confirm_message(Delivery, State), + case Confirm of immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); _ -> ok end, @@ -481,36 +489,36 @@ attempt_delivery(#delivery{txn = none, BQ:publish_delivered( AckRequired, Message, (?BASE_MESSAGE_PROPERTIES)#message_properties{ - needs_confirming = (NeedsConfirming =:= eventually)}, + needs_confirming = needs_confirming(Confirm)}, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, {Delivered, State1} = deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State), - {Delivered, NeedsConfirming, State1}; + {Delivered, Confirm, State1}; attempt_delivery(#delivery{txn = Txn, sender = ChPid, - message = Message}, - {NeedsConfirming, State = #q{backing_queue = BQ, - backing_queue_state = BQS}}) -> + message = Message} = Delivery, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> store_ch_record((ch_record(ChPid))#cr{txn = Txn}), BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS), - {true, NeedsConfirming, State#q{backing_queue_state = BQS1}}. - -deliver_or_enqueue(Delivery, State) -> - case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of - {true, _, State1} -> - State1; - {false, NeedsConfirming, State1 = #q{backing_queue = BQ, - backing_queue_state = BQS}} -> - #delivery{message = Message} = Delivery, - BQS1 = BQ:publish(Message, - (message_properties(State)) #message_properties{ - needs_confirming = - (NeedsConfirming =:= eventually)}, - BQS), - ensure_ttl_timer(State1#q{backing_queue_state = BQS1}) + {true, should_confirm_message(Delivery, State), + State#q{backing_queue_state = BQS1}}. + +deliver_or_enqueue(Delivery = #delivery{message = Message}, State) -> + {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), + State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = + maybe_record_confirm_message(Confirm, State1), + case Delivered of + true -> State2; + false -> BQS1 = + BQ:publish(Message, + (message_properties(State)) #message_properties{ + needs_confirming = needs_confirming(Confirm)}, + BQS), + ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> @@ -829,9 +837,11 @@ handle_call({deliver_immediately, Delivery}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, _NeedsConfirming, State1} = - attempt_delivery(Delivery, record_confirm_message(Delivery, State)), - reply(Delivered, State1); + {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), + reply(Delivered, case Delivered of + true -> maybe_record_confirm_message(Confirm, State1); + false -> State1 + end); handle_call({deliver, Delivery}, From, State) -> %% Synchronous, "mandatory" delivery mode. Reply asap. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 7ddb7814..6167790e 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -331,17 +331,18 @@ group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) -> group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed). maybe_auto_delete(XName, Bindings, Deletions) -> - case mnesia:read({rabbit_exchange, XName}) of - [] -> - add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions); - [X] -> - add_deletion(XName, {X, not_deleted, Bindings}, - case rabbit_exchange:maybe_auto_delete(X) of - not_deleted -> Deletions; - {deleted, Deletions1} -> combine_deletions( - Deletions, Deletions1) - end) - end. + {Entry, Deletions1} = + case mnesia:read({rabbit_exchange, XName}) of + [] -> {{undefined, not_deleted, Bindings}, Deletions}; + [X] -> case rabbit_exchange:maybe_auto_delete(X) of + not_deleted -> + {{X, not_deleted, Bindings}, Deletions}; + {deleted, Deletions2} -> + {{X, deleted, Bindings}, + combine_deletions(Deletions, Deletions2)} + end + end, + add_deletion(XName, Entry, Deletions1). delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index e661e5e3..9ca52327 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -88,8 +88,8 @@ status() -> {running_nodes, running_clustered_nodes()}]. init() -> - ok = ensure_mnesia_running(), - ok = ensure_mnesia_dir(), + ensure_mnesia_running(), + ensure_mnesia_dir(), ok = init_db(read_cluster_nodes_config(), true, true), ok. @@ -108,8 +108,8 @@ force_cluster(ClusterNodes) -> %% node. If Force is false, only connections to online nodes are %% allowed. cluster(ClusterNodes, Force) -> - ok = ensure_mnesia_not_running(), - ok = ensure_mnesia_dir(), + ensure_mnesia_not_running(), + ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), try ok = init_db(ClusterNodes, Force, true), @@ -434,8 +434,9 @@ init_db(ClusterNodes, Force, DoSecondaryLocalUpgrades) -> %% We're the first node up case rabbit_upgrade:maybe_upgrade_local() of ok -> ensure_schema_integrity(); - version_not_available -> schema_ok_or_move() - end; + version_not_available -> ok = schema_ok_or_move() + end, + ok; {[AnotherNode|_], _} -> %% Subsequent node in cluster, catch up ensure_version_ok( @@ -459,7 +460,8 @@ init_db(ClusterNodes, Force, DoSecondaryLocalUpgrades) -> end; false -> ok end, - ensure_schema_integrity() + ensure_schema_integrity(), + ok end; {error, Reason} -> %% one reason we may end up here is if we try to join @@ -499,7 +501,7 @@ create_schema() -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), ok = create_tables(), - ok = ensure_schema_integrity(), + ensure_schema_integrity(), ok = rabbit_version:record_desired(). move_db() -> @@ -520,11 +522,12 @@ move_db() -> {error, Reason} -> throw({error, {cannot_backup_mnesia, MnesiaDir, BackupDir, Reason}}) end, - ok = ensure_mnesia_dir(), + ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), ok. copy_db(Destination) -> + ok = ensure_mnesia_not_running(), rabbit_misc:recursive_copy(dir(), Destination). create_tables() -> @@ -599,12 +602,12 @@ wait_for_tables(TableNames) -> end. reset(Force) -> - ok = ensure_mnesia_not_running(), + ensure_mnesia_not_running(), Node = node(), case Force of true -> ok; false -> - ok = ensure_mnesia_dir(), + ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), {Nodes, RunningNodes} = try diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index a08bbd70..bb26de64 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -850,16 +850,16 @@ terminate(_Reason, State = #msstate { index_state = IndexState, State1 = case CurHdl of undefined -> State; _ -> State2 = internal_sync(State), - file_handle_cache:close(CurHdl), + ok = file_handle_cache:close(CurHdl), State2 end, State3 = close_all_handles(State1), - store_file_summary(FileSummaryEts, Dir), - [ets:delete(T) || + ok = store_file_summary(FileSummaryEts, Dir), + [true = ets:delete(T) || T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]], IndexModule:terminate(IndexState), - store_recovery_terms([{client_refs, dict:fetch_keys(Clients)}, - {index_module, IndexModule}], Dir), + ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)}, + {index_module, IndexModule}], Dir), State3 #msstate { index_state = undefined, current_file_handle = undefined }. @@ -912,13 +912,16 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, false -> [{CRef, MsgIds} | NS] end end, [], CTM), - case {Syncs, CGs} of - {[], []} -> ok; - _ -> file_handle_cache:sync(CurHdl) - end, + ok = case {Syncs, CGs} of + {[], []} -> ok; + _ -> file_handle_cache:sync(CurHdl) + end, [K() || K <- lists:reverse(Syncs)], - [client_confirm(CRef, MsgIds, written, State1) || {CRef, MsgIds} <- CGs], - State1 #msstate { cref_to_msg_ids = dict:new(), on_sync = [] }. + State2 = lists:foldl( + fun ({CRef, MsgIds}, StateN) -> + client_confirm(CRef, MsgIds, written, StateN) + end, State1, CGs), + State2 #msstate { on_sync = [] }. write_action({true, not_found}, _MsgId, State) -> {ignore, undefined, State}; @@ -1147,7 +1150,7 @@ orddict_store(Key, Val, Dict) -> orddict:store(Key, Val, Dict). update_pending_confirms(Fun, CRef, - State = #msstate { clients = Clients, + State = #msstate { clients = Clients, cref_to_msg_ids = CTM }) -> case dict:fetch(CRef, Clients) of {undefined, _CloseFDsFun} -> State; @@ -1466,7 +1469,7 @@ recover_file_summary(false, _Dir) -> recover_file_summary(true, Dir) -> Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME), case ets:file2tab(Path) of - {ok, Tid} -> file:delete(Path), + {ok, Tid} -> ok = file:delete(Path), {true, Tid}; {error, _Error} -> recover_file_summary(false, Dir) end. @@ -1533,9 +1536,7 @@ scan_file_for_valid_messages(Dir, FileName) -> Hdl, filelib:file_size( form_filename(Dir, FileName)), fun scan_fun/2, []), - %% if something really bad has happened, - %% the close could fail, but ignore - file_handle_cache:close(Hdl), + ok = file_handle_cache:close(Hdl), Valid; {error, enoent} -> {ok, [], 0}; {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}} @@ -1971,32 +1972,33 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, force_recovery(BaseDir, Store) -> Dir = filename:join(BaseDir, atom_to_list(Store)), - file:delete(filename:join(Dir, ?CLEAN_FILENAME)), + ok = file:delete(filename:join(Dir, ?CLEAN_FILENAME)), recover_crashed_compactions(BaseDir), ok. foreach_file(D, Fun, Files) -> - [Fun(filename:join(D, File)) || File <- Files]. + [ok = Fun(filename:join(D, File)) || File <- Files]. foreach_file(D1, D2, Fun, Files) -> - [Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files]. + [ok = Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files]. transform_dir(BaseDir, Store, TransformFun) -> Dir = filename:join(BaseDir, atom_to_list(Store)), TmpDir = filename:join(Dir, ?TRANSFORM_TMP), TransformFile = fun (A, B) -> transform_msg_file(A, B, TransformFun) end, + CopyFile = fun (Src, Dst) -> {ok, _Bytes} = file:copy(Src, Dst), ok end, case filelib:is_dir(TmpDir) of true -> throw({error, transform_failed_previously}); false -> FileList = list_sorted_file_names(Dir, ?FILE_EXTENSION), foreach_file(Dir, TmpDir, TransformFile, FileList), foreach_file(Dir, fun file:delete/1, FileList), - foreach_file(TmpDir, Dir, fun file:copy/2, FileList), + foreach_file(TmpDir, Dir, CopyFile, FileList), foreach_file(TmpDir, fun file:delete/1, FileList), ok = file:del_dir(TmpDir) end. transform_msg_file(FileOld, FileNew, TransformFun) -> - rabbit_misc:ensure_parent_dirs_exist(FileNew), + ok = rabbit_misc:ensure_parent_dirs_exist(FileNew), {ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []), {ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write], [{write_buffer, @@ -2009,6 +2011,6 @@ transform_msg_file(FileOld, FileNew, TransformFun) -> {ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew), ok end, ok), - file_handle_cache:close(RefOld), - file_handle_cache:close(RefNew), + ok = file_handle_cache:close(RefOld), + ok = file_handle_cache:close(RefNew), ok. diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 92ad6a24..8800e8d6 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -235,10 +235,8 @@ post_process_script(ScriptFile) -> {error, {failed_to_load_script, Reason}} end. -process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) -> - [{apply,{rabbit,prepare,[]}}, Entry]; process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) -> - [{apply,{rabbit_upgrade,maybe_upgrade_mnesia,[]}}, Entry]; + [{apply,{rabbit,prepare,[]}}, Entry]; process_entry(Entry) -> [Entry]. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b8c3f4a9..ca046c91 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1630,23 +1630,42 @@ test_file_handle_cache() -> ok = file_handle_cache:set_limit(5), %% 1 or 2 sockets, 2 msg_stores TmpDir = filename:join(rabbit_mnesia:dir(), "tmp"), ok = filelib:ensure_dir(filename:join(TmpDir, "nothing")), + [Src1, Dst1, Src2, Dst2] = Files = + [filename:join(TmpDir, Str) || Str <- ["file1", "file2", "file3", "file4"]], + Content = <<"foo">>, + CopyFun = fun (Src, Dst) -> + ok = file:write_file(Src, Content), + {ok, SrcHdl} = file_handle_cache:open(Src, [read], []), + {ok, DstHdl} = file_handle_cache:open(Dst, [write], []), + Size = size(Content), + {ok, Size} = file_handle_cache:copy(SrcHdl, DstHdl, Size), + ok = file_handle_cache:delete(SrcHdl), + ok = file_handle_cache:delete(DstHdl) + end, Pid = spawn(fun () -> {ok, Hdl} = file_handle_cache:open( - filename:join(TmpDir, "file3"), + filename:join(TmpDir, "file5"), [write], []), - receive close -> ok end, - file_handle_cache:delete(Hdl) + receive {next, Pid1} -> Pid1 ! {next, self()} end, + file_handle_cache:delete(Hdl), + %% This will block and never return, so we + %% exercise the fhc tidying up the pending + %% queue on the death of a process. + ok = CopyFun(Src1, Dst1) end), - Src = filename:join(TmpDir, "file1"), - Dst = filename:join(TmpDir, "file2"), - Content = <<"foo">>, - ok = file:write_file(Src, Content), - {ok, SrcHdl} = file_handle_cache:open(Src, [read], []), - {ok, DstHdl} = file_handle_cache:open(Dst, [write], []), - Size = size(Content), - {ok, Size} = file_handle_cache:copy(SrcHdl, DstHdl, Size), - ok = file_handle_cache:delete(SrcHdl), - file_handle_cache:delete(DstHdl), - Pid ! close, + ok = CopyFun(Src1, Dst1), + ok = file_handle_cache:set_limit(2), + Pid ! {next, self()}, + receive {next, Pid} -> ok end, + timer:sleep(100), + Pid1 = spawn(fun () -> CopyFun(Src2, Dst2) end), + timer:sleep(100), + erlang:monitor(process, Pid), + erlang:monitor(process, Pid1), + exit(Pid, kill), + exit(Pid1, kill), + receive {'DOWN', _MRef, process, Pid, _Reason} -> ok end, + receive {'DOWN', _MRef1, process, Pid1, _Reason1} -> ok end, + [file:delete(File) || File <- Files], ok = file_handle_cache:set_limit(Limit), passed. diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 244be522..3981b173 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -91,7 +91,7 @@ %% ------------------------------------------------------------------- -ensure_backup() -> +ensure_backup_taken() -> case filelib:is_file(lock_filename()) of false -> case filelib:is_dir(backup_dir()) of false -> ok = take_backup(); @@ -101,7 +101,6 @@ ensure_backup() -> end. take_backup() -> - rabbit:prepare(), %% Ensure we have logs for this BackupDir = backup_dir(), case rabbit_mnesia:copy_db(BackupDir) of ok -> info("upgrades: Mnesia dir backed up to ~p~n", @@ -109,7 +108,7 @@ take_backup() -> {error, E} -> throw({could_not_back_up_mnesia_dir, E}) end. -maybe_remove_backup() -> +ensure_backup_removed() -> case filelib:is_dir(backup_dir()) of true -> ok = remove_backup(); _ -> ok @@ -134,11 +133,11 @@ maybe_upgrade_mnesia() -> {ok, []} -> ok; {ok, Upgrades} -> - rabbit:prepare(), %% Ensure we have logs for this - case upgrade_mode(AllNodes) of - primary -> primary_upgrade(Upgrades, AllNodes); - secondary -> secondary_upgrade(AllNodes) - end + ensure_backup_taken(), + ok = case upgrade_mode(AllNodes) of + primary -> primary_upgrade(Upgrades, AllNodes); + secondary -> secondary_upgrade(AllNodes) + end end. upgrade_mode(AllNodes) -> @@ -200,30 +199,32 @@ die(Msg, Args) -> primary_upgrade(Upgrades, Nodes) -> Others = Nodes -- [node()], - apply_upgrades( - mnesia, - Upgrades, - fun () -> - force_tables(), - case Others of - [] -> ok; - _ -> info("mnesia upgrades: Breaking cluster~n", []), - [{atomic, ok} = mnesia:del_table_copy(schema, Node) - || Node <- Others] - end - end), + ok = apply_upgrades( + mnesia, + Upgrades, + fun () -> + force_tables(), + case Others of + [] -> ok; + _ -> info("mnesia upgrades: Breaking cluster~n", []), + [{atomic, ok} = mnesia:del_table_copy(schema, Node) + || Node <- Others] + end + end), ok. force_tables() -> [mnesia:force_load_table(T) || T <- rabbit_mnesia:table_names()]. secondary_upgrade(AllNodes) -> + %% must do this before we wipe out schema + IsDiscNode = is_disc_node(), rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), cannot_delete_schema), %% Note that we cluster with all nodes, rather than all disc nodes %% (as we can't know all disc nodes at this point). This is safe as %% we're not writing the cluster config, just setting up Mnesia. - ClusterNodes = case is_disc_node() of + ClusterNodes = case IsDiscNode of true -> AllNodes; false -> AllNodes -- [node()] end, @@ -247,17 +248,19 @@ maybe_upgrade_local() -> case rabbit_version:upgrades_required(local) of {error, version_not_available} -> version_not_available; {error, _} = Err -> throw(Err); - {ok, []} -> maybe_remove_backup(); + {ok, []} -> ensure_backup_removed(), + ok; {ok, Upgrades} -> mnesia:stop(), - apply_upgrades(local, Upgrades, - fun () -> ok end), - maybe_remove_backup() + ensure_backup_taken(), + ok = apply_upgrades(local, Upgrades, + fun () -> ok end), + ensure_backup_removed(), + ok end. %% ------------------------------------------------------------------- apply_upgrades(Scope, Upgrades, Fun) -> - ensure_backup(), ok = rabbit_misc:lock_file(lock_filename()), info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 6e3460c5..7a1102e5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -150,10 +150,13 @@ %% responsive. %% %% In the queue we keep track of both messages that are pending -%% delivery and messages that are pending acks. This ensures that -%% purging (deleting the former) and deletion (deleting the former and -%% the latter) are both cheap and do require any scanning through qi -%% segments. +%% delivery and messages that are pending acks. In the event of a +%% queue purge, we only need to load qi segments if the queue has +%% elements in deltas (i.e. it came under significant memory +%% pressure). In the event of a queue deletion, in addition to the +%% preceding, by keeping track of pending acks in RAM, we do not need +%% to search through qi segments looking for messages that are yet to +%% be acknowledged. %% %% Pending acks are recorded in memory either as the tuple {SeqId, %% MsgId, MsgProps} (tuple-form) or as the message itself (message- |