diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-04-25 12:24:40 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-04-25 12:24:40 +0100 |
commit | 0f95632fe3adff3f1df0a02e25d2096e0717e151 (patch) | |
tree | 6fac220c3f712f7c8cd29314f6a17a850ea65f1c | |
parent | e1c52e7484a1abb3531e65c32c2d3d1896456d63 (diff) | |
parent | 4aea09ab8900c2be0b21a17214a2209121b4549b (diff) | |
download | rabbitmq-server-0f95632fe3adff3f1df0a02e25d2096e0717e151.tar.gz |
Merge bug24884
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | packaging/common/rabbitmq-server.init | 2 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/rabbitmq-server.default | 9 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/rules | 1 | ||||
-rw-r--r-- | src/rabbit.erl | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 80 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 41 | ||||
-rw-r--r-- | src/rabbit_control.erl | 16 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 2 | ||||
-rw-r--r-- | src/rabbit_nodes.erl | 9 |
11 files changed, 103 insertions, 66 deletions
@@ -207,7 +207,7 @@ start-background-node: all -rm -f $(RABBITMQ_MNESIA_DIR).pid mkdir -p $(RABBITMQ_MNESIA_DIR) setsid sh -c "$(MAKE) run-background-node > $(RABBITMQ_MNESIA_DIR)/startup_log 2> $(RABBITMQ_MNESIA_DIR)/startup_err" & - sleep 1 + ./scripts/rabbitmqctl -n $(RABBITMQ_NODENAME) wait $(RABBITMQ_MNESIA_DIR).pid kernel start-rabbit-on-node: all echo "rabbit:start()." | $(ERL_CALL) diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init index c942f8e3..40238c8e 100644 --- a/packaging/common/rabbitmq-server.init +++ b/packaging/common/rabbitmq-server.init @@ -35,6 +35,8 @@ test -x $CONTROL || exit 0 RETVAL=0 set -e +[ -f /etc/default/${NAME} ] && . /etc/default/${NAME} + ensure_pid_dir () { PID_DIR=`dirname ${PID_FILE}` if [ ! -d ${PID_DIR} ] ; then diff --git a/packaging/debs/Debian/debian/rabbitmq-server.default b/packaging/debs/Debian/debian/rabbitmq-server.default new file mode 100644 index 00000000..bde5e308 --- /dev/null +++ b/packaging/debs/Debian/debian/rabbitmq-server.default @@ -0,0 +1,9 @@ +# This file is sourced by /etc/init.d/rabbitmq-server. Its primary +# reason for existing is to allow adjustment of system limits for the +# rabbitmq-server process. +# +# Maximum number of open file handles. This will need to be increased +# to handle many simultaneous connections. Refer to the system +# documentation for ulimit (in man bash) for more information. +# +#ulimit -n 1024 diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index 108b1ed5..ecb778df 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -19,3 +19,4 @@ install/rabbitmq-server:: done sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server + install -p -D -m 0644 debian/rabbitmq-server.default $(DEB_DESTDIR)etc/default/rabbitmq-server diff --git a/src/rabbit.erl b/src/rabbit.erl index eec7e34e..b1f786a0 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -347,10 +347,7 @@ status() -> is_running() -> is_running(node()). is_running(Node) -> - case rpc:call(Node, application, which_applications, [infinity]) of - {badrpc, _} -> false; - Apps -> proplists:is_defined(rabbit, Apps) - end. + rabbit_nodes:is_running(Node, rabbit). environment() -> lists:keysort( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0cf7de40..5701efeb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -723,6 +723,14 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. +ack_if_no_dlx(AckTags, State = #q{dlx = undefined, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State#q{backing_queue_state = BQS1}; +ack_if_no_dlx(_AckTags, State) -> + State. + dead_letter_fun(_Reason, #q{dlx = undefined}) -> undefined; dead_letter_fun(Reason, _State) -> @@ -730,31 +738,24 @@ dead_letter_fun(Reason, _State) -> gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}) end. -dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) -> - case rabbit_exchange:lookup(DLX) of - {error, not_found} -> noreply(State); - _ -> dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State) +dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) -> + DLMsg = #basic_message{exchange_name = XName} = + make_dead_letter_msg(Reason, Msg, State), + case rabbit_exchange:lookup(XName) of + {ok, X} -> + Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo), + {Queues, Cycles} = detect_dead_letter_cycles( + DLMsg, rabbit_exchange:route(X, Delivery)), + lists:foreach(fun log_cycle_once/1, Cycles), + QPids = rabbit_amqqueue:lookup(Queues), + {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery), + DeliveredQPids; + {error, not_found} -> + [] end. -dead_letter_publish(Msg, Reason, - State = #q{publish_seqno = MsgSeqNo, - dlx = DLX}) -> - Delivery = #delivery{message = #basic_message{exchange_name = XName}} = - rabbit_basic:delivery( - false, false, make_dead_letter_msg(DLX, Reason, Msg, State), - MsgSeqNo), - {ok, X} = rabbit_exchange:lookup(XName), - Queues = rabbit_exchange:route(X, Delivery), - {Queues1, Cycles} = detect_dead_letter_cycles(Delivery, Queues), - lists:foreach(fun log_cycle_once/1, Cycles), - QPids = rabbit_amqqueue:lookup(Queues1), - {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery), - DeliveredQPids. - -dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State = #q{publish_seqno = MsgSeqNo, - unconfirmed = UC}) -> +dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo, + unconfirmed = UC}) -> QPids = dead_letter_publish(Msg, Reason, State), State1 = State#q{queue_monitors = pmon:monitor_all( QPids, State#q.queue_monitors), @@ -813,8 +814,7 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, false -> noreply(State1) end. -detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}}, - Queues) -> +detect_dead_letter_cycles(#basic_message{content = Content}, Queues) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), NoCycles = {Queues, []}, @@ -841,31 +841,31 @@ detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}} end end. -make_dead_letter_msg(DLX, Reason, +make_dead_letter_msg(Reason, Msg = #basic_message{content = Content, exchange_name = Exchange, routing_keys = RoutingKeys}, - State = #q{dlx_routing_key = DlxRoutingKey}) -> + State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) -> {DeathRoutingKeys, HeadersFun1} = case DlxRoutingKey of undefined -> {RoutingKeys, fun (H) -> H end}; _ -> {[DlxRoutingKey], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} end, + ReasonBin = list_to_binary(atom_to_list(Reason)), #resource{name = QName} = qname(State), + TimeSec = rabbit_misc:now_ms() div 1000, HeadersFun2 = fun (Headers) -> %% The first routing key is the one specified in the %% basic.publish; all others are CC or BCC keys. - RoutingKeys1 = - [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], - Info = [{<<"reason">>, - longstr, list_to_binary(atom_to_list(Reason))}, - {<<"queue">>, longstr, QName}, - {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000}, - {<<"exchange">>, longstr, Exchange#resource.name}, - {<<"routing-keys">>, array, - [{longstr, Key} || Key <- RoutingKeys1]}], + RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], + RKs1 = [{longstr, Key} || Key <- RKs], + Info = [{<<"reason">>, longstr, ReasonBin}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, TimeSec}, + {<<"exchange">>, longstr, Exchange#resource.name}, + {<<"routing-keys">>, array, RKs1}], HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>, Info, Headers)) end, @@ -1240,11 +1240,13 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> ChPid, AckTags, State, case Requeue of true -> fun (State1) -> requeue_and_run(AckTags, State1) end; - false -> Fun = dead_letter_fun(rejected, State), - fun (State1 = #q{backing_queue = BQ, + false -> fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + Fun = dead_letter_fun(rejected, State1), BQS1 = BQ:fold(Fun, BQS, AckTags), - State1#q{backing_queue_state = BQS1} + ack_if_no_dlx( + AckTags, + State1#q{backing_queue_state = BQS1}) end end)); diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 8ad59016..17d848da 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -63,7 +63,7 @@ -spec(extract_headers/1 :: (rabbit_types:content()) -> headers()). --spec(map_headers/2 :: (rabbit_types:content(), fun((headers()) -> headers())) +-spec(map_headers/2 :: (fun((headers()) -> headers()), rabbit_types:content()) -> rabbit_types:content()). -spec(header_routes/1 :: diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 846890a1..22c6a223 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,9 +36,9 @@ conn_name, limiter, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, virtual_host, most_recently_declared_queue, queue_monitors, - consumer_mapping, blocking, queue_consumers, queue_collector_pid, - stats_timer, confirm_enabled, publish_seqno, unconfirmed, - confirmed, capabilities, trace_state}). + consumer_mapping, blocking, queue_consumers, delivering_queues, + queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, + unconfirmed, confirmed, capabilities, trace_state}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -198,6 +198,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, consumer_mapping = dict:new(), blocking = sets:new(), queue_consumers = dict:new(), + delivering_queues = sets:new(), queue_collector_pid = CollectorPid, confirm_enabled = false, publish_seqno = 1, @@ -331,10 +332,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), State2 = queue_blocked(QPid, State1), State3 = handle_consuming_queue_down(QPid, State2), + State4 = handle_delivering_queue_down(QPid, State3), credit_flow:peer_down(QPid), erase_queue_stats(QPid), noreply(State3#ch{queue_monitors = pmon:erase( - QPid, State3#ch.queue_monitors)}); + QPid, State4#ch.queue_monitors)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -657,7 +659,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, - Msg = {_QName, _QPid, _MsgId, Redelivered, + Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}}} -> @@ -669,7 +671,8 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, message_count = MessageCount}, Content), - {noreply, record_sent(none, not(NoAck), Msg, State)}; + State1 = monitor_delivering_queue(NoAck, QPid, State), + {noreply, record_sent(none, not(NoAck), Msg, State1)}; empty -> {reply, #'basic.get_empty'{}, State} end; @@ -707,10 +710,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ActualConsumerTag})), Q} end) of - {ok, Q} -> - State1 = State#ch{consumer_mapping = - dict:store(ActualConsumerTag, Q, - ConsumerMapping)}, + {ok, Q = #amqqueue{pid = QPid}} -> + CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping), + State1 = monitor_delivering_queue( + NoAck, QPid, State#ch{consumer_mapping = CM1}), {noreply, case NoWait of true -> consumer_monitor(ActualConsumerTag, State1); @@ -1108,6 +1111,13 @@ consumer_monitor(ConsumerTag, State end. +monitor_delivering_queue(true, _QPid, State) -> + State; +monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons, + delivering_queues = DQ}) -> + State#ch{queue_monitors = pmon:monitor(QPid, QMons), + delivering_queues = sets:add_element(QPid, DQ)}. + handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> case rabbit_misc:is_abnormal_termination(Reason) of true -> {MXs, UC1} = dtree:take_all(QPid, UC), @@ -1134,6 +1144,9 @@ handle_consuming_queue_down(QPid, State#ch{consumer_mapping = ConsumerMapping1, queue_consumers = dict:erase(QPid, QCons)}. +handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> + State#ch{delivering_queues = sets:del_element(QPid, DQ)}. + binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, @@ -1269,9 +1282,11 @@ new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), notify_queues(State = #ch{state = closing}) -> {ok, State}; -notify_queues(State = #ch{consumer_mapping = Consumers}) -> - {rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()), - State#ch{state = closing}}. +notify_queues(State = #ch{consumer_mapping = Consumers, + delivering_queues = DQ }) -> + QPids = sets:to_list( + sets:union(sets:from_list(consumer_queues(Consumers)), DQ)), + {rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}. fold_per_queue(_F, Acc, []) -> Acc; diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 51f88c8f..8b24d2e3 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -194,7 +194,11 @@ action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) -> action(wait, Node, [PidFile], _Opts, Inform) -> Inform("Waiting for ~p", [Node]), - wait_for_application(Node, PidFile, Inform); + wait_for_application(Node, PidFile, rabbit, Inform); + +action(wait, Node, [PidFile, App], _Opts, Inform) -> + Inform("Waiting for ~p on ~p", [App, Node]), + wait_for_application(Node, PidFile, list_to_atom(App), Inform); action(status, Node, [], _Opts, Inform) -> Inform("Status of node ~p", [Node]), @@ -379,17 +383,17 @@ action(eval, Node, [Expr], _Opts, _Inform) -> %%---------------------------------------------------------------------------- -wait_for_application(Node, PidFile, Inform) -> +wait_for_application(Node, PidFile, Application, Inform) -> Pid = read_pid_file(PidFile, true), Inform("pid is ~s", [Pid]), - wait_for_application(Node, Pid). + wait_for_application(Node, Pid, Application). -wait_for_application(Node, Pid) -> +wait_for_application(Node, Pid, Application) -> case process_up(Pid) of - true -> case rabbit:is_running(Node) of + true -> case rabbit_nodes:is_running(Node, Application) of true -> ok; false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), - wait_for_application(Node, Pid) + wait_for_application(Node, Pid, Application) end; false -> {error, process_not_running} end. diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 2d155d14..17e2ffb4 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -356,7 +356,7 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) -> handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }). -handle_info(send_gm_heartbeat, State = #state{gm = GM}) -> +handle_info(send_gm_heartbeat, State = #state { gm = GM }) -> gm:broadcast(GM, heartbeat), ensure_gm_heartbeat(), noreply(State); diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index 9a972d9e..b6a9e263 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -16,7 +16,7 @@ -module(rabbit_nodes). --export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0]). +-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, is_running/2]). -define(EPMD_TIMEOUT, 30000). @@ -32,6 +32,7 @@ -spec(make/1 :: ({string(), string()} | string()) -> node()). -spec(parts/1 :: (node() | string()) -> {string(), string()}). -spec(cookie_hash/0 :: () -> string()). +-spec(is_running/2 :: (node(), atom()) -> boolean()). -endif. @@ -92,3 +93,9 @@ parts(NodeStr) -> cookie_hash() -> base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))). + +is_running(Node, Application) -> + case rpc:call(Node, application, which_applications, [infinity]) of + {badrpc, _} -> false; + Apps -> proplists:is_defined(Application, Apps) + end. |