From 1e21ce8437d5bf02c5bf3dbf6659d5ba73ef7131 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 12 Apr 2012 14:51:35 +0100 Subject: track queues which may have delivered non-autoack messages to channel ...so that we can ensure that acks/requeues have been seen by those queues during an orderly shutdown by the time a 'close' (in the error case) or 'close_ok' is sent. --- src/rabbit_channel.erl | 41 ++++++++++++++++++++++++++++------------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 846890a1..22c6a223 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,9 +36,9 @@ conn_name, limiter, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, virtual_host, most_recently_declared_queue, queue_monitors, - consumer_mapping, blocking, queue_consumers, queue_collector_pid, - stats_timer, confirm_enabled, publish_seqno, unconfirmed, - confirmed, capabilities, trace_state}). + consumer_mapping, blocking, queue_consumers, delivering_queues, + queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, + unconfirmed, confirmed, capabilities, trace_state}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -198,6 +198,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, consumer_mapping = dict:new(), blocking = sets:new(), queue_consumers = dict:new(), + delivering_queues = sets:new(), queue_collector_pid = CollectorPid, confirm_enabled = false, publish_seqno = 1, @@ -331,10 +332,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), State2 = queue_blocked(QPid, State1), State3 = handle_consuming_queue_down(QPid, State2), + State4 = handle_delivering_queue_down(QPid, State3), credit_flow:peer_down(QPid), erase_queue_stats(QPid), noreply(State3#ch{queue_monitors = pmon:erase( - QPid, State3#ch.queue_monitors)}); + QPid, State4#ch.queue_monitors)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -657,7 +659,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, - Msg = {_QName, _QPid, _MsgId, Redelivered, + Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}}} -> @@ -669,7 +671,8 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, message_count = MessageCount}, Content), - {noreply, record_sent(none, not(NoAck), Msg, State)}; + State1 = monitor_delivering_queue(NoAck, QPid, State), + {noreply, record_sent(none, not(NoAck), Msg, State1)}; empty -> {reply, #'basic.get_empty'{}, State} end; @@ -707,10 +710,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ActualConsumerTag})), Q} end) of - {ok, Q} -> - State1 = State#ch{consumer_mapping = - dict:store(ActualConsumerTag, Q, - ConsumerMapping)}, + {ok, Q = #amqqueue{pid = QPid}} -> + CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping), + State1 = monitor_delivering_queue( + NoAck, QPid, State#ch{consumer_mapping = CM1}), {noreply, case NoWait of true -> consumer_monitor(ActualConsumerTag, State1); @@ -1108,6 +1111,13 @@ consumer_monitor(ConsumerTag, State end. +monitor_delivering_queue(true, _QPid, State) -> + State; +monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons, + delivering_queues = DQ}) -> + State#ch{queue_monitors = pmon:monitor(QPid, QMons), + delivering_queues = sets:add_element(QPid, DQ)}. + handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> case rabbit_misc:is_abnormal_termination(Reason) of true -> {MXs, UC1} = dtree:take_all(QPid, UC), @@ -1134,6 +1144,9 @@ handle_consuming_queue_down(QPid, State#ch{consumer_mapping = ConsumerMapping1, queue_consumers = dict:erase(QPid, QCons)}. +handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> + State#ch{delivering_queues = sets:del_element(QPid, DQ)}. + binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, @@ -1269,9 +1282,11 @@ new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), notify_queues(State = #ch{state = closing}) -> {ok, State}; -notify_queues(State = #ch{consumer_mapping = Consumers}) -> - {rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()), - State#ch{state = closing}}. +notify_queues(State = #ch{consumer_mapping = Consumers, + delivering_queues = DQ }) -> + QPids = sets:to_list( + sets:union(sets:from_list(consumer_queues(Consumers)), DQ)), + {rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}. fold_per_queue(_F, Acc, []) -> Acc; -- cgit v1.2.1 From 0bbde99b7539b773679fd74e9978b1a2bad08702 Mon Sep 17 00:00:00 2001 From: Francesco Mazzoli Date: Tue, 17 Apr 2012 14:44:09 +0100 Subject: Moved various functions that query the status of applications from rabbit_control to rabbit_misc. --- Makefile | 2 +- src/rabbit.erl | 5 +-- src/rabbit_control.erl | 75 +++---------------------------------------- src/rabbit_misc.erl | 86 ++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 92 insertions(+), 76 deletions(-) diff --git a/Makefile b/Makefile index db7462a6..3069387c 100644 --- a/Makefile +++ b/Makefile @@ -207,7 +207,7 @@ start-background-node: all -rm -f $(RABBITMQ_MNESIA_DIR).pid mkdir -p $(RABBITMQ_MNESIA_DIR) setsid sh -c "$(MAKE) run-background-node > $(RABBITMQ_MNESIA_DIR)/startup_log 2> $(RABBITMQ_MNESIA_DIR)/startup_err" & - sleep 1 + sleep 5 start-rabbit-on-node: all echo "rabbit:start()." | $(ERL_CALL) diff --git a/src/rabbit.erl b/src/rabbit.erl index eec7e34e..bee2634d 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -347,10 +347,7 @@ status() -> is_running() -> is_running(node()). is_running(Node) -> - case rpc:call(Node, application, which_applications, [infinity]) of - {badrpc, _} -> false; - Apps -> proplists:is_defined(rabbit, Apps) - end. + rabbit_misc:is_running(Node, rabbit). environment() -> lists:keysort( diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 51f88c8f..17a69725 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -20,7 +20,6 @@ -export([start/0, stop/0, action/5]). -define(RPC_TIMEOUT, infinity). --define(EXTERNAL_CHECK_INTERVAL, 1000). -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). @@ -157,8 +156,8 @@ action(stop, Node, Args, _Opts, Inform) -> Inform("Stopping and halting node ~p", [Node]), Res = call(Node, {rabbit, stop_and_halt, []}), case {Res, Args} of - {ok, [PidFile]} -> wait_for_process_death( - read_pid_file(PidFile, false)); + {ok, [PidFile]} -> rabbit_misc:wait_for_process_death( + rabbit_misc:read_pid_file(PidFile, false)); {ok, [_, _| _]} -> exit({badarg, Args}); _ -> ok end, @@ -380,75 +379,9 @@ action(eval, Node, [Expr], _Opts, _Inform) -> %%---------------------------------------------------------------------------- wait_for_application(Node, PidFile, Inform) -> - Pid = read_pid_file(PidFile, true), + Pid = rabbit_misc:read_pid_file(PidFile, true), Inform("pid is ~s", [Pid]), - wait_for_application(Node, Pid). - -wait_for_application(Node, Pid) -> - case process_up(Pid) of - true -> case rabbit:is_running(Node) of - true -> ok; - false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), - wait_for_application(Node, Pid) - end; - false -> {error, process_not_running} - end. - -wait_for_process_death(Pid) -> - case process_up(Pid) of - true -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), - wait_for_process_death(Pid); - false -> ok - end. - -read_pid_file(PidFile, Wait) -> - case {file:read_file(PidFile), Wait} of - {{ok, Bin}, _} -> - S = string:strip(binary_to_list(Bin), right, $\n), - try list_to_integer(S) - catch error:badarg -> - exit({error, {garbage_in_pid_file, PidFile}}) - end, - S; - {{error, enoent}, true} -> - timer:sleep(?EXTERNAL_CHECK_INTERVAL), - read_pid_file(PidFile, Wait); - {{error, _} = E, _} -> - exit({error, {could_not_read_pid, E}}) - end. - -% Test using some OS clunkiness since we shouldn't trust -% rpc:call(os, getpid, []) at this point -process_up(Pid) -> - with_os([{unix, fun () -> - system("ps -p " ++ Pid - ++ " >/dev/null 2>&1") =:= 0 - end}, - {win32, fun () -> - Res = os:cmd("tasklist /nh /fi \"pid eq " ++ - Pid ++ "\" 2>&1"), - case re:run(Res, "erl\\.exe", [{capture, none}]) of - match -> true; - _ -> false - end - end}]). - -with_os(Handlers) -> - {OsFamily, _} = os:type(), - case proplists:get_value(OsFamily, Handlers) of - undefined -> throw({unsupported_os, OsFamily}); - Handler -> Handler() - end. - -% Like system(3) -system(Cmd) -> - ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'", - Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]), - receive {Port, {exit_status, Status}} -> Status end. - -% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'" -escape_quotes(Cmd) -> - lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). + rabbit_misc:wait_for_application(Node, Pid, rabbit). format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 0aacd654..e224bf7b 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -61,6 +61,10 @@ -export([quit/1]). -export([os_cmd/1]). -export([gb_sets_difference/2]). +-export([is_running/2, wait_for_application/3, wait_for_process_death/1, + read_pid_file/2]). + +-define(EXTERNAL_CHECK_INTERVAL, 1000). %%---------------------------------------------------------------------------- @@ -206,6 +210,11 @@ -spec(quit/1 :: (integer() | string()) -> no_return()). -spec(os_cmd/1 :: (string()) -> string()). -spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()). +-spec(is_running/2 :: (node(), atom()) -> boolean()). +-spec(wait_for_application/3 :: (node(), pid(), atom()) + -> ok | {error, process_not_running}). +-spec(wait_for_process_death/1 :: (pid()) -> ok). +-spec(read_pid_file/2 :: (file:name(), boolean()) -> pid() | no_return()). -endif. @@ -917,3 +926,80 @@ os_cmd(Command) -> gb_sets_difference(S1, S2) -> gb_sets:fold(fun gb_sets:delete_any/2, S1, S2). + +%%---------------------------------------------------------------------------- + +is_running(Node, Application) -> + case rpc:call(Node, application, which_applications, [infinity]) of + {badrpc, _} -> false; + Apps -> proplists:is_defined(Application, Apps) + end. + +wait_for_application(Node, Pid, Application) -> + case process_up(Pid) of + true -> case is_running(Node, Application) of + true -> ok; + false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), + wait_for_application(Node, Pid, Application) + end; + false -> {error, process_not_running} + end. + +wait_for_process_death(Pid) -> + case process_up(Pid) of + true -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), + wait_for_process_death(Pid); + false -> ok + end. + +read_pid_file(PidFile, Wait) -> + case {file:read_file(PidFile), Wait} of + {{ok, Bin}, _} -> + S = string:strip(binary_to_list(Bin), right, $\n), + try list_to_integer(S) + catch error:badarg -> + exit({error, {garbage_in_pid_file, PidFile}}) + end, + S; + {{error, enoent}, true} -> + timer:sleep(?EXTERNAL_CHECK_INTERVAL), + read_pid_file(PidFile, Wait); + {{error, _} = E, _} -> + exit({error, {could_not_read_pid, E}}) + end. + +% Test using some OS clunkiness since we shouldn't trust +% rpc:call(os, getpid, []) at this point +process_up(Pid) -> + with_os([{unix, fun () -> + system("ps -p " ++ Pid + ++ " >/dev/null 2>&1") =:= 0 + end}, + {win32, fun () -> + Res = os:cmd("tasklist /nh /fi \"pid eq " ++ + Pid ++ "\" 2>&1"), + case re:run(Res, "erl\\.exe", [{capture, none}]) of + match -> true; + _ -> false + end + end}]). + +with_os(Handlers) -> + {OsFamily, _} = os:type(), + case proplists:get_value(OsFamily, Handlers) of + undefined -> throw({unsupported_os, OsFamily}); + Handler -> Handler() + end. + +% Like system(3) +system(Cmd) -> + ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'", + Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]), + receive {Port, {exit_status, Status}} -> Status end. + +% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'" +escape_quotes(Cmd) -> + lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). + +format_parse_error({_Line, Mod, Err}) -> + lists:flatten(Mod:format_error(Err)). -- cgit v1.2.1 From 59a8cdb514cc539ba92f7e550284f00ba118345c Mon Sep 17 00:00:00 2001 From: Francesco Mazzoli Date: Tue, 17 Apr 2012 16:36:15 +0100 Subject: Added wait_node script to wait for the node to be up. --- Makefile | 2 +- src/rabbit_misc.erl | 9 +++------ wait_node | 21 +++++++++++++++++++++ 3 files changed, 25 insertions(+), 7 deletions(-) create mode 100755 wait_node diff --git a/Makefile b/Makefile index 3069387c..5da62956 100644 --- a/Makefile +++ b/Makefile @@ -207,7 +207,7 @@ start-background-node: all -rm -f $(RABBITMQ_MNESIA_DIR).pid mkdir -p $(RABBITMQ_MNESIA_DIR) setsid sh -c "$(MAKE) run-background-node > $(RABBITMQ_MNESIA_DIR)/startup_log 2> $(RABBITMQ_MNESIA_DIR)/startup_err" & - sleep 5 + ./wait_node $(RABBITMQ_NODENAME) $(RABBITMQ_MNESIA_DIR).pid start-rabbit-on-node: all echo "rabbit:start()." | $(ERL_CALL) diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index e224bf7b..9456e3c1 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -211,10 +211,10 @@ -spec(os_cmd/1 :: (string()) -> string()). -spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()). -spec(is_running/2 :: (node(), atom()) -> boolean()). --spec(wait_for_application/3 :: (node(), pid(), atom()) +-spec(wait_for_application/3 :: (node(), string(), atom()) -> ok | {error, process_not_running}). --spec(wait_for_process_death/1 :: (pid()) -> ok). --spec(read_pid_file/2 :: (file:name(), boolean()) -> pid() | no_return()). +-spec(wait_for_process_death/1 :: (string()) -> ok). +-spec(read_pid_file/2 :: (file:name(), boolean()) -> string() | no_return()). -endif. @@ -1000,6 +1000,3 @@ system(Cmd) -> % Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'" escape_quotes(Cmd) -> lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). - -format_parse_error({_Line, Mod, Err}) -> - lists:flatten(Mod:format_error(Err)). diff --git a/wait_node b/wait_node new file mode 100755 index 00000000..e4549b5c --- /dev/null +++ b/wait_node @@ -0,0 +1,21 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! -sname wait_node -pa ./ebin + +main([NodeStr, PidFile]) -> + case {code:load_file(rabbit_misc), code:load_file(rabbit_nodes)} of + {{module, _}, {module, _}} -> + ok; + _ -> + io:format("Compile with 'make' before running this script~n") + end, + Node = rabbit_nodes:make(NodeStr), + Pid = rabbit_misc:read_pid_file(PidFile, true), + io:format("pid is ~s~n", [Pid]), + rabbit_misc:wait_for_application(Node, Pid, kernel); +main(_) -> + usage(). + +usage() -> + io:format("Usage: ./wait_node node pidfile~n"), + halt(1). -- cgit v1.2.1 From b6d851b0638e8e0f6262ddecb2566c4ec1892d08 Mon Sep 17 00:00:00 2001 From: Francesco Mazzoli Date: Tue, 17 Apr 2012 16:37:08 +0100 Subject: Exiting when modules can't be found. --- wait_node | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/wait_node b/wait_node index e4549b5c..ee7dfeab 100755 --- a/wait_node +++ b/wait_node @@ -7,7 +7,8 @@ main([NodeStr, PidFile]) -> {{module, _}, {module, _}} -> ok; _ -> - io:format("Compile with 'make' before running this script~n") + io:format("Compile with 'make' before running this script~n"), + halt(1) end, Node = rabbit_nodes:make(NodeStr), Pid = rabbit_misc:read_pid_file(PidFile, true), -- cgit v1.2.1 From e133222d58265c805f701661259bf7310aeb32b1 Mon Sep 17 00:00:00 2001 From: Francesco Mazzoli Date: Wed, 18 Apr 2012 15:56:31 +0100 Subject: Re-creating bug24863 branching of default --- src/rabbit_amqqueue_process.erl | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3caf728b..2764c027 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -717,6 +717,14 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. +ack_nacked_messages(AckTags, State = #q{dlx = undefined, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State#q{backing_queue_state = BQS1}; +ack_nacked_messages(_AckTags, State) -> + State. + dead_letter_fun(_Reason, #q{dlx = undefined}) -> undefined; dead_letter_fun(Reason, _State) -> @@ -1226,13 +1234,16 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> noreply(subtract_acks( ChPid, AckTags, State, case Requeue of - true -> fun (State1) -> requeue_and_run(AckTags, State1) end; - false -> Fun = dead_letter_fun(rejected, State), - fun (State1 = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - BQS1 = BQ:fold(Fun, BQS, AckTags), - State1#q{backing_queue_state = BQS1} - end + true -> + fun (State1) -> requeue_and_run(AckTags, State1) end; + false -> + Fun = dead_letter_fun(rejected, State), + fun (State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQS1 = BQ:fold(Fun, BQS, AckTags), + ack_nacked_messages( + AckTags, State1#q{backing_queue_state = BQS1}) + end end)); handle_cast(delete_immediately, State) -> -- cgit v1.2.1 From 9490a9374f425a292299cd899e03f34293167867 Mon Sep 17 00:00:00 2001 From: Francesco Mazzoli Date: Thu, 19 Apr 2012 14:14:19 +0100 Subject: Checking for existance of DLX in dead_letter_publish --- src/rabbit_amqqueue_process.erl | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2063e557..fe8fac45 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -724,13 +724,6 @@ dead_letter_fun(Reason, _State) -> gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}) end. -dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) -> - case rabbit_exchange:lookup(DLX) of - {error, not_found} -> noreply(State); - _ -> dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State) - end. - dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo, dlx = DLX}) -> @@ -738,17 +731,20 @@ dead_letter_publish(Msg, Reason, rabbit_basic:delivery( false, false, make_dead_letter_msg(DLX, Reason, Msg, State), MsgSeqNo), - {ok, X} = rabbit_exchange:lookup(XName), - Queues = rabbit_exchange:route(X, Delivery), - {Queues1, Cycles} = detect_dead_letter_cycles(Delivery, Queues), - lists:foreach(fun log_cycle_once/1, Cycles), - QPids = rabbit_amqqueue:lookup(Queues1), - {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery), - DeliveredQPids. - -dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State = #q{publish_seqno = MsgSeqNo, - unconfirmed = UC}) -> + case rabbit_exchange:lookup(XName) of + {ok, X} -> + Queues = rabbit_exchange:route(X, Delivery), + {Queues1, Cycles} = detect_dead_letter_cycles(Delivery, Queues), + lists:foreach(fun log_cycle_once/1, Cycles), + QPids = rabbit_amqqueue:lookup(Queues1), + {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery), + DeliveredQPids; + {error, not_found} -> + [] + end. + +dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo, + unconfirmed = UC}) -> QPids = dead_letter_publish(Msg, Reason, State), State1 = State#q{queue_monitors = pmon:monitor_all( QPids, State#q.queue_monitors), -- cgit v1.2.1 From 026e68eef57bcc9905dda740664be393493d7795 Mon Sep 17 00:00:00 2001 From: Francesco Mazzoli Date: Thu, 19 Apr 2012 15:16:40 +0100 Subject: ack_nacked_messages -> ack_if_no_dlx --- src/rabbit_amqqueue_process.erl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2764c027..0703bffc 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -717,12 +717,12 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. -ack_nacked_messages(AckTags, State = #q{dlx = undefined, - backing_queue = BQ, - backing_queue_state = BQS }) -> +ack_if_no_dlx(AckTags, State = #q{dlx = undefined, + backing_queue = BQ, + backing_queue_state = BQS }) -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), State#q{backing_queue_state = BQS1}; -ack_nacked_messages(_AckTags, State) -> +ack_if_no_dlx(_AckTags, State) -> State. dead_letter_fun(_Reason, #q{dlx = undefined}) -> @@ -1241,7 +1241,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> BQS1 = BQ:fold(Fun, BQS, AckTags), - ack_nacked_messages( + ack_if_no_dlx( AckTags, State1#q{backing_queue_state = BQS1}) end end)); -- cgit v1.2.1 From a656a34b97f863389cd4ba9acc5a39fdb0ef34dc Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 19 Apr 2012 16:19:17 +0100 Subject: In general, avoid unnecessary interleaving of mutating state --- src/rabbit_amqqueue_process.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0703bffc..07c86672 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1237,9 +1237,9 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> true -> fun (State1) -> requeue_and_run(AckTags, State1) end; false -> - Fun = dead_letter_fun(rejected, State), fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + Fun = dead_letter_fun(rejected, State1), BQS1 = BQ:fold(Fun, BQS, AckTags), ack_if_no_dlx( AckTags, State1#q{backing_queue_state = BQS1}) -- cgit v1.2.1 From 3105f1a500d2d957083e564d7405ce1d4a50cd40 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 19 Apr 2012 17:03:12 +0100 Subject: cosmetic --- src/rabbit_amqqueue_process.erl | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 064df26f..19333fb9 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1241,16 +1241,15 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> noreply(subtract_acks( ChPid, AckTags, State, case Requeue of - true -> - fun (State1) -> requeue_and_run(AckTags, State1) end; - false -> - fun (State1 = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - Fun = dead_letter_fun(rejected, State1), - BQS1 = BQ:fold(Fun, BQS, AckTags), - ack_if_no_dlx( - AckTags, State1#q{backing_queue_state = BQS1}) - end + true -> fun (State1) -> requeue_and_run(AckTags, State1) end; + false -> fun (State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + Fun = dead_letter_fun(rejected, State1), + BQS1 = BQ:fold(Fun, BQS, AckTags), + ack_if_no_dlx( + AckTags, + State1#q{backing_queue_state = BQS1}) + end end)); handle_cast(delete_immediately, State) -> -- cgit v1.2.1 From 447b4219c71af18ae23b092ae48331559e90169e Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 20 Apr 2012 13:28:42 +0100 Subject: simplifying refactor --- src/rabbit_amqqueue_process.erl | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fe8fac45..526a8b1d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -724,19 +724,16 @@ dead_letter_fun(Reason, _State) -> gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}) end. -dead_letter_publish(Msg, Reason, - State = #q{publish_seqno = MsgSeqNo, - dlx = DLX}) -> - Delivery = #delivery{message = #basic_message{exchange_name = XName}} = - rabbit_basic:delivery( - false, false, make_dead_letter_msg(DLX, Reason, Msg, State), - MsgSeqNo), +dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) -> + DLMsg = #basic_message{exchange_name = XName} = + make_dead_letter_msg(Reason, Msg, State), case rabbit_exchange:lookup(XName) of {ok, X} -> - Queues = rabbit_exchange:route(X, Delivery), - {Queues1, Cycles} = detect_dead_letter_cycles(Delivery, Queues), + Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo), + {Queues, Cycles} = detect_dead_letter_cycles( + DLMsg, rabbit_exchange:route(X, Delivery)), lists:foreach(fun log_cycle_once/1, Cycles), - QPids = rabbit_amqqueue:lookup(Queues1), + QPids = rabbit_amqqueue:lookup(Queues), {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery), DeliveredQPids; {error, not_found} -> @@ -803,8 +800,7 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, false -> noreply(State1) end. -detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}}, - Queues) -> +detect_dead_letter_cycles(#basic_message{content = Content}, Queues) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), NoCycles = {Queues, []}, @@ -831,11 +827,11 @@ detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}} end end. -make_dead_letter_msg(DLX, Reason, +make_dead_letter_msg(Reason, Msg = #basic_message{content = Content, exchange_name = Exchange, routing_keys = RoutingKeys}, - State = #q{dlx_routing_key = DlxRoutingKey}) -> + State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) -> {DeathRoutingKeys, HeadersFun1} = case DlxRoutingKey of undefined -> {RoutingKeys, fun (H) -> H end}; -- cgit v1.2.1 From 74f676d5312f6d8d270001b4e2ed464bdb2ff7cd Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 20 Apr 2012 13:42:32 +0100 Subject: cosmetic --- src/rabbit_amqqueue_process.erl | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 04306ab3..6b825607 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -846,20 +846,20 @@ make_dead_letter_msg(Reason, _ -> {[DlxRoutingKey], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} end, + ReasonBin = list_to_binary(atom_to_list(Reason)), #resource{name = QName} = qname(State), + TimeSec = rabbit_misc:now_ms() div 1000, HeadersFun2 = fun (Headers) -> %% The first routing key is the one specified in the %% basic.publish; all others are CC or BCC keys. - RoutingKeys1 = - [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], - Info = [{<<"reason">>, - longstr, list_to_binary(atom_to_list(Reason))}, - {<<"queue">>, longstr, QName}, - {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000}, - {<<"exchange">>, longstr, Exchange#resource.name}, - {<<"routing-keys">>, array, - [{longstr, Key} || Key <- RoutingKeys1]}], + RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], + RKs1 = [{longstr, Key} || Key <- RKs], + Info = [{<<"reason">>, longstr, ReasonBin}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, TimeSec}, + {<<"exchange">>, longstr, Exchange#resource.name}, + {<<"routing-keys">>, array, RKs1}], HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>, Info, Headers)) end, -- cgit v1.2.1 From 66bd59a1f14d30b8d1a5aae6cc0077fd14889268 Mon Sep 17 00:00:00 2001 From: Francesco Mazzoli Date: Mon, 23 Apr 2012 11:43:29 +0100 Subject: Move node waiting to rabbit_control. That functionality doesn't have much to do with rabbit itself (we should leave it undocumented). --- Makefile | 2 +- src/rabbit.erl | 2 +- src/rabbit_control.erl | 85 +++++++++++++++++++++++++++++++++++++++++++++----- src/rabbit_misc.erl | 83 ------------------------------------------------ src/rabbit_nodes.erl | 9 +++++- wait_node | 22 ------------- 6 files changed, 87 insertions(+), 116 deletions(-) delete mode 100755 wait_node diff --git a/Makefile b/Makefile index 5da62956..49bf926a 100644 --- a/Makefile +++ b/Makefile @@ -207,7 +207,7 @@ start-background-node: all -rm -f $(RABBITMQ_MNESIA_DIR).pid mkdir -p $(RABBITMQ_MNESIA_DIR) setsid sh -c "$(MAKE) run-background-node > $(RABBITMQ_MNESIA_DIR)/startup_log 2> $(RABBITMQ_MNESIA_DIR)/startup_err" & - ./wait_node $(RABBITMQ_NODENAME) $(RABBITMQ_MNESIA_DIR).pid + ./scripts/rabbitmqctl -n $(RABBITMQ_NODENAME) wait $(RABBITMQ_MNESIA_DIR).pid kernel start-rabbit-on-node: all echo "rabbit:start()." | $(ERL_CALL) diff --git a/src/rabbit.erl b/src/rabbit.erl index bee2634d..b1f786a0 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -347,7 +347,7 @@ status() -> is_running() -> is_running(node()). is_running(Node) -> - rabbit_misc:is_running(Node, rabbit). + rabbit_nodes:is_running(Node, rabbit). environment() -> lists:keysort( diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 17a69725..40663f67 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -20,6 +20,7 @@ -export([start/0, stop/0, action/5]). -define(RPC_TIMEOUT, infinity). +-define(EXTERNAL_CHECK_INTERVAL, 1000). -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). @@ -156,8 +157,8 @@ action(stop, Node, Args, _Opts, Inform) -> Inform("Stopping and halting node ~p", [Node]), Res = call(Node, {rabbit, stop_and_halt, []}), case {Res, Args} of - {ok, [PidFile]} -> rabbit_misc:wait_for_process_death( - rabbit_misc:read_pid_file(PidFile, false)); + {ok, [PidFile]} -> wait_for_process_death( + read_pid_file(PidFile, false)); {ok, [_, _| _]} -> exit({badarg, Args}); _ -> ok end, @@ -191,9 +192,16 @@ action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) -> [Node, ClusterNodes]), rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]); -action(wait, Node, [PidFile], _Opts, Inform) -> +action(wait, Node, Args, _Opts, Inform) -> + {PidFile, Application} = + case Args of + [PidFile0] -> {PidFile0, rabbit}; + [PidFile0, AppString] -> {PidFile0, list_to_atom(AppString)} + end, Inform("Waiting for ~p", [Node]), - wait_for_application(Node, PidFile, Inform); + Pid = read_pid_file(PidFile, true), + Inform("pid is ~s", [Pid]), + wait_for_application(Node, Pid, Application); action(status, Node, [], _Opts, Inform) -> Inform("Status of node ~p", [Node]), @@ -378,10 +386,71 @@ action(eval, Node, [Expr], _Opts, _Inform) -> %%---------------------------------------------------------------------------- -wait_for_application(Node, PidFile, Inform) -> - Pid = rabbit_misc:read_pid_file(PidFile, true), - Inform("pid is ~s", [Pid]), - rabbit_misc:wait_for_application(Node, Pid, rabbit). +wait_for_application(Node, Pid, Application) -> + case process_up(Pid) of + true -> case rabbit_nodes:is_running(Node, Application) of + true -> ok; + false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), + wait_for_application(Node, Pid, Application) + end; + false -> {error, process_not_running} + end. + +wait_for_process_death(Pid) -> + case process_up(Pid) of + true -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), + wait_for_process_death(Pid); + false -> ok + end. + +read_pid_file(PidFile, Wait) -> + case {file:read_file(PidFile), Wait} of + {{ok, Bin}, _} -> + S = string:strip(binary_to_list(Bin), right, $\n), + try list_to_integer(S) + catch error:badarg -> + exit({error, {garbage_in_pid_file, PidFile}}) + end, + S; + {{error, enoent}, true} -> + timer:sleep(?EXTERNAL_CHECK_INTERVAL), + read_pid_file(PidFile, Wait); + {{error, _} = E, _} -> + exit({error, {could_not_read_pid, E}}) + end. + +% Test using some OS clunkiness since we shouldn't trust +% rpc:call(os, getpid, []) at this point +process_up(Pid) -> + with_os([{unix, fun () -> + system("ps -p " ++ Pid + ++ " >/dev/null 2>&1") =:= 0 + end}, + {win32, fun () -> + Res = os:cmd("tasklist /nh /fi \"pid eq " ++ + Pid ++ "\" 2>&1"), + case re:run(Res, "erl\\.exe", [{capture, none}]) of + match -> true; + _ -> false + end + end}]). + +with_os(Handlers) -> + {OsFamily, _} = os:type(), + case proplists:get_value(OsFamily, Handlers) of + undefined -> throw({unsupported_os, OsFamily}); + Handler -> Handler() + end. + +% Like system(3) +system(Cmd) -> + ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'", + Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]), + receive {Port, {exit_status, Status}} -> Status end. + +% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'" +escape_quotes(Cmd) -> + lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 9456e3c1..0aacd654 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -61,10 +61,6 @@ -export([quit/1]). -export([os_cmd/1]). -export([gb_sets_difference/2]). --export([is_running/2, wait_for_application/3, wait_for_process_death/1, - read_pid_file/2]). - --define(EXTERNAL_CHECK_INTERVAL, 1000). %%---------------------------------------------------------------------------- @@ -210,11 +206,6 @@ -spec(quit/1 :: (integer() | string()) -> no_return()). -spec(os_cmd/1 :: (string()) -> string()). -spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()). --spec(is_running/2 :: (node(), atom()) -> boolean()). --spec(wait_for_application/3 :: (node(), string(), atom()) - -> ok | {error, process_not_running}). --spec(wait_for_process_death/1 :: (string()) -> ok). --spec(read_pid_file/2 :: (file:name(), boolean()) -> string() | no_return()). -endif. @@ -926,77 +917,3 @@ os_cmd(Command) -> gb_sets_difference(S1, S2) -> gb_sets:fold(fun gb_sets:delete_any/2, S1, S2). - -%%---------------------------------------------------------------------------- - -is_running(Node, Application) -> - case rpc:call(Node, application, which_applications, [infinity]) of - {badrpc, _} -> false; - Apps -> proplists:is_defined(Application, Apps) - end. - -wait_for_application(Node, Pid, Application) -> - case process_up(Pid) of - true -> case is_running(Node, Application) of - true -> ok; - false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), - wait_for_application(Node, Pid, Application) - end; - false -> {error, process_not_running} - end. - -wait_for_process_death(Pid) -> - case process_up(Pid) of - true -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), - wait_for_process_death(Pid); - false -> ok - end. - -read_pid_file(PidFile, Wait) -> - case {file:read_file(PidFile), Wait} of - {{ok, Bin}, _} -> - S = string:strip(binary_to_list(Bin), right, $\n), - try list_to_integer(S) - catch error:badarg -> - exit({error, {garbage_in_pid_file, PidFile}}) - end, - S; - {{error, enoent}, true} -> - timer:sleep(?EXTERNAL_CHECK_INTERVAL), - read_pid_file(PidFile, Wait); - {{error, _} = E, _} -> - exit({error, {could_not_read_pid, E}}) - end. - -% Test using some OS clunkiness since we shouldn't trust -% rpc:call(os, getpid, []) at this point -process_up(Pid) -> - with_os([{unix, fun () -> - system("ps -p " ++ Pid - ++ " >/dev/null 2>&1") =:= 0 - end}, - {win32, fun () -> - Res = os:cmd("tasklist /nh /fi \"pid eq " ++ - Pid ++ "\" 2>&1"), - case re:run(Res, "erl\\.exe", [{capture, none}]) of - match -> true; - _ -> false - end - end}]). - -with_os(Handlers) -> - {OsFamily, _} = os:type(), - case proplists:get_value(OsFamily, Handlers) of - undefined -> throw({unsupported_os, OsFamily}); - Handler -> Handler() - end. - -% Like system(3) -system(Cmd) -> - ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'", - Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]), - receive {Port, {exit_status, Status}} -> Status end. - -% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'" -escape_quotes(Cmd) -> - lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index 9a972d9e..b6a9e263 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -16,7 +16,7 @@ -module(rabbit_nodes). --export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0]). +-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, is_running/2]). -define(EPMD_TIMEOUT, 30000). @@ -32,6 +32,7 @@ -spec(make/1 :: ({string(), string()} | string()) -> node()). -spec(parts/1 :: (node() | string()) -> {string(), string()}). -spec(cookie_hash/0 :: () -> string()). +-spec(is_running/2 :: (node(), atom()) -> boolean()). -endif. @@ -92,3 +93,9 @@ parts(NodeStr) -> cookie_hash() -> base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))). + +is_running(Node, Application) -> + case rpc:call(Node, application, which_applications, [infinity]) of + {badrpc, _} -> false; + Apps -> proplists:is_defined(Application, Apps) + end. diff --git a/wait_node b/wait_node deleted file mode 100755 index ee7dfeab..00000000 --- a/wait_node +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env escript -%% -*- erlang -*- -%%! -sname wait_node -pa ./ebin - -main([NodeStr, PidFile]) -> - case {code:load_file(rabbit_misc), code:load_file(rabbit_nodes)} of - {{module, _}, {module, _}} -> - ok; - _ -> - io:format("Compile with 'make' before running this script~n"), - halt(1) - end, - Node = rabbit_nodes:make(NodeStr), - Pid = rabbit_misc:read_pid_file(PidFile, true), - io:format("pid is ~s~n", [Pid]), - rabbit_misc:wait_for_application(Node, Pid, kernel); -main(_) -> - usage(). - -usage() -> - io:format("Usage: ./wait_node node pidfile~n"), - halt(1). -- cgit v1.2.1 From 31dece5bbc7d7832f3766f23e9d99d891df4e75e Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Mon, 23 Apr 2012 12:54:56 +0100 Subject: cosmetic --- src/rabbit_mirror_queue_coordinator.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 2d155d14..17e2ffb4 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -356,7 +356,7 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) -> handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }). -handle_info(send_gm_heartbeat, State = #state{gm = GM}) -> +handle_info(send_gm_heartbeat, State = #state { gm = GM }) -> gm:broadcast(GM, heartbeat), ensure_gm_heartbeat(), noreply(State); -- cgit v1.2.1 From 8419ee59ff50511be70b11ac91f1350b696b5eac Mon Sep 17 00:00:00 2001 From: Steve Powell Date: Mon, 23 Apr 2012 13:15:26 +0100 Subject: Add ulimit default file to debian package installation and source it on server start. --- packaging/common/rabbitmq-server.init | 2 ++ .../debs/Debian/debian/rabbitmq-server.default | 26 ++++++++++++++++++++++ packaging/debs/Debian/debian/rules | 1 + 3 files changed, 29 insertions(+) create mode 100644 packaging/debs/Debian/debian/rabbitmq-server.default diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init index c942f8e3..40238c8e 100644 --- a/packaging/common/rabbitmq-server.init +++ b/packaging/common/rabbitmq-server.init @@ -35,6 +35,8 @@ test -x $CONTROL || exit 0 RETVAL=0 set -e +[ -f /etc/default/${NAME} ] && . /etc/default/${NAME} + ensure_pid_dir () { PID_DIR=`dirname ${PID_FILE}` if [ ! -d ${PID_DIR} ] ; then diff --git a/packaging/debs/Debian/debian/rabbitmq-server.default b/packaging/debs/Debian/debian/rabbitmq-server.default new file mode 100644 index 00000000..6efa1b98 --- /dev/null +++ b/packaging/debs/Debian/debian/rabbitmq-server.default @@ -0,0 +1,26 @@ +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License +## at http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +## the License for the specific language governing rights and +## limitations under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developer of the Original Code is VMware, Inc. +## Copyright (c) 2012 VMware, Inc. All rights reserved. + +# This file is /etc/default/rabbitmq-server and is designed to allow +# adjustment of system limits for the rabbitmq-server service process. + +# Refer to the system documentation for ulimit (in man bash). + +# open file handles +#ulimit -n 1024 + +# The rabbitmq-server service must be restarted for these settings to +# be used. This file is installed when the package is installed and is +# not preserved on upgrade. \ No newline at end of file diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index 108b1ed5..16f68931 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -19,3 +19,4 @@ install/rabbitmq-server:: done sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' debian/postrm install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server + install -p -D -m 0755 debian/rabbitmq-server.default $(DEB_DESTDIR)etc/default/rabbitmq-server -- cgit v1.2.1 From fe8baaf0e545efd06af1d6e7ee569c4ab249e951 Mon Sep 17 00:00:00 2001 From: Francesco Mazzoli Date: Mon, 23 Apr 2012 13:26:19 +0100 Subject: fix type signature for rabbit_basic:map_headers/2 --- src/rabbit_basic.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 8ad59016..17d848da 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -63,7 +63,7 @@ -spec(extract_headers/1 :: (rabbit_types:content()) -> headers()). --spec(map_headers/2 :: (rabbit_types:content(), fun((headers()) -> headers())) +-spec(map_headers/2 :: (fun((headers()) -> headers()), rabbit_types:content()) -> rabbit_types:content()). -spec(header_routes/1 :: -- cgit v1.2.1 From 9a74933ce9c42283db8b7470ea77929fb35fc37b Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 23 Apr 2012 16:58:44 +0100 Subject: I don't think we need a licence header, and it's intimidating. Rephrase a bit too. --- .../debs/Debian/debian/rabbitmq-server.default | 33 ++++++---------------- 1 file changed, 8 insertions(+), 25 deletions(-) diff --git a/packaging/debs/Debian/debian/rabbitmq-server.default b/packaging/debs/Debian/debian/rabbitmq-server.default index 6efa1b98..bde5e308 100644 --- a/packaging/debs/Debian/debian/rabbitmq-server.default +++ b/packaging/debs/Debian/debian/rabbitmq-server.default @@ -1,26 +1,9 @@ -## The contents of this file are subject to the Mozilla Public License -## Version 1.1 (the "License"); you may not use this file except in -## compliance with the License. You may obtain a copy of the License -## at http://www.mozilla.org/MPL/ -## -## Software distributed under the License is distributed on an "AS IS" -## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -## the License for the specific language governing rights and -## limitations under the License. -## -## The Original Code is RabbitMQ. -## -## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2012 VMware, Inc. All rights reserved. - -# This file is /etc/default/rabbitmq-server and is designed to allow -# adjustment of system limits for the rabbitmq-server service process. - -# Refer to the system documentation for ulimit (in man bash). - -# open file handles +# This file is sourced by /etc/init.d/rabbitmq-server. Its primary +# reason for existing is to allow adjustment of system limits for the +# rabbitmq-server process. +# +# Maximum number of open file handles. This will need to be increased +# to handle many simultaneous connections. Refer to the system +# documentation for ulimit (in man bash) for more information. +# #ulimit -n 1024 - -# The rabbitmq-server service must be restarted for these settings to -# be used. This file is installed when the package is installed and is -# not preserved on upgrade. \ No newline at end of file -- cgit v1.2.1 From 498767aef58b1050b2253f4860bedd11f1a13905 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 23 Apr 2012 16:59:16 +0100 Subject: 755 is wrong, this should not be executable. Somehow it ends up not executable anyway, but we shouldn't say it is... --- packaging/debs/Debian/debian/rules | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index 16f68931..ecb778df 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -19,4 +19,4 @@ install/rabbitmq-server:: done sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' debian/postrm install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server - install -p -D -m 0755 debian/rabbitmq-server.default $(DEB_DESTDIR)etc/default/rabbitmq-server + install -p -D -m 0644 debian/rabbitmq-server.default $(DEB_DESTDIR)etc/default/rabbitmq-server -- cgit v1.2.1 From f31f2cfbdf1c6c2f1895110c4fb22835a0ea276b Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 24 Apr 2012 12:59:46 +0100 Subject: Cosmetic. Ish. --- src/rabbit_control.erl | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 40663f67..8b24d2e3 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -192,16 +192,13 @@ action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) -> [Node, ClusterNodes]), rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]); -action(wait, Node, Args, _Opts, Inform) -> - {PidFile, Application} = - case Args of - [PidFile0] -> {PidFile0, rabbit}; - [PidFile0, AppString] -> {PidFile0, list_to_atom(AppString)} - end, +action(wait, Node, [PidFile], _Opts, Inform) -> Inform("Waiting for ~p", [Node]), - Pid = read_pid_file(PidFile, true), - Inform("pid is ~s", [Pid]), - wait_for_application(Node, Pid, Application); + wait_for_application(Node, PidFile, rabbit, Inform); + +action(wait, Node, [PidFile, App], _Opts, Inform) -> + Inform("Waiting for ~p on ~p", [App, Node]), + wait_for_application(Node, PidFile, list_to_atom(App), Inform); action(status, Node, [], _Opts, Inform) -> Inform("Status of node ~p", [Node]), @@ -386,6 +383,11 @@ action(eval, Node, [Expr], _Opts, _Inform) -> %%---------------------------------------------------------------------------- +wait_for_application(Node, PidFile, Application, Inform) -> + Pid = read_pid_file(PidFile, true), + Inform("pid is ~s", [Pid]), + wait_for_application(Node, Pid, Application). + wait_for_application(Node, Pid, Application) -> case process_up(Pid) of true -> case rabbit_nodes:is_running(Node, Application) of -- cgit v1.2.1