summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-04-25 10:52:47 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-04-25 10:52:47 +0100
commit63a6a40f5beff915b3e8d0512f04bfbfded1ea2c (patch)
tree6fac220c3f712f7c8cd29314f6a17a850ea65f1c
parente1c52e7484a1abb3531e65c32c2d3d1896456d63 (diff)
parent4aea09ab8900c2be0b21a17214a2209121b4549b (diff)
downloadrabbitmq-server-bug24884.tar.gz
merge defaultbug24884
-rw-r--r--Makefile2
-rw-r--r--packaging/common/rabbitmq-server.init2
-rw-r--r--packaging/debs/Debian/debian/rabbitmq-server.default9
-rw-r--r--packaging/debs/Debian/debian/rules1
-rw-r--r--src/rabbit.erl5
-rw-r--r--src/rabbit_amqqueue_process.erl80
-rw-r--r--src/rabbit_basic.erl2
-rw-r--r--src/rabbit_channel.erl41
-rw-r--r--src/rabbit_control.erl16
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl2
-rw-r--r--src/rabbit_nodes.erl9
11 files changed, 103 insertions, 66 deletions
diff --git a/Makefile b/Makefile
index db7462a6..49bf926a 100644
--- a/Makefile
+++ b/Makefile
@@ -207,7 +207,7 @@ start-background-node: all
-rm -f $(RABBITMQ_MNESIA_DIR).pid
mkdir -p $(RABBITMQ_MNESIA_DIR)
setsid sh -c "$(MAKE) run-background-node > $(RABBITMQ_MNESIA_DIR)/startup_log 2> $(RABBITMQ_MNESIA_DIR)/startup_err" &
- sleep 1
+ ./scripts/rabbitmqctl -n $(RABBITMQ_NODENAME) wait $(RABBITMQ_MNESIA_DIR).pid kernel
start-rabbit-on-node: all
echo "rabbit:start()." | $(ERL_CALL)
diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init
index c942f8e3..40238c8e 100644
--- a/packaging/common/rabbitmq-server.init
+++ b/packaging/common/rabbitmq-server.init
@@ -35,6 +35,8 @@ test -x $CONTROL || exit 0
RETVAL=0
set -e
+[ -f /etc/default/${NAME} ] && . /etc/default/${NAME}
+
ensure_pid_dir () {
PID_DIR=`dirname ${PID_FILE}`
if [ ! -d ${PID_DIR} ] ; then
diff --git a/packaging/debs/Debian/debian/rabbitmq-server.default b/packaging/debs/Debian/debian/rabbitmq-server.default
new file mode 100644
index 00000000..bde5e308
--- /dev/null
+++ b/packaging/debs/Debian/debian/rabbitmq-server.default
@@ -0,0 +1,9 @@
+# This file is sourced by /etc/init.d/rabbitmq-server. Its primary
+# reason for existing is to allow adjustment of system limits for the
+# rabbitmq-server process.
+#
+# Maximum number of open file handles. This will need to be increased
+# to handle many simultaneous connections. Refer to the system
+# documentation for ulimit (in man bash) for more information.
+#
+#ulimit -n 1024
diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules
index 108b1ed5..ecb778df 100644
--- a/packaging/debs/Debian/debian/rules
+++ b/packaging/debs/Debian/debian/rules
@@ -19,3 +19,4 @@ install/rabbitmq-server::
done
sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm
install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server
+ install -p -D -m 0644 debian/rabbitmq-server.default $(DEB_DESTDIR)etc/default/rabbitmq-server
diff --git a/src/rabbit.erl b/src/rabbit.erl
index eec7e34e..b1f786a0 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -347,10 +347,7 @@ status() ->
is_running() -> is_running(node()).
is_running(Node) ->
- case rpc:call(Node, application, which_applications, [infinity]) of
- {badrpc, _} -> false;
- Apps -> proplists:is_defined(rabbit, Apps)
- end.
+ rabbit_nodes:is_running(Node, rabbit).
environment() ->
lists:keysort(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0cf7de40..5701efeb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -723,6 +723,14 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
+ack_if_no_dlx(AckTags, State = #q{dlx = undefined,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State#q{backing_queue_state = BQS1};
+ack_if_no_dlx(_AckTags, State) ->
+ State.
+
dead_letter_fun(_Reason, #q{dlx = undefined}) ->
undefined;
dead_letter_fun(Reason, _State) ->
@@ -730,31 +738,24 @@ dead_letter_fun(Reason, _State) ->
gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason})
end.
-dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) ->
- case rabbit_exchange:lookup(DLX) of
- {error, not_found} -> noreply(State);
- _ -> dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
- State)
+dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) ->
+ DLMsg = #basic_message{exchange_name = XName} =
+ make_dead_letter_msg(Reason, Msg, State),
+ case rabbit_exchange:lookup(XName) of
+ {ok, X} ->
+ Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo),
+ {Queues, Cycles} = detect_dead_letter_cycles(
+ DLMsg, rabbit_exchange:route(X, Delivery)),
+ lists:foreach(fun log_cycle_once/1, Cycles),
+ QPids = rabbit_amqqueue:lookup(Queues),
+ {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
+ DeliveredQPids;
+ {error, not_found} ->
+ []
end.
-dead_letter_publish(Msg, Reason,
- State = #q{publish_seqno = MsgSeqNo,
- dlx = DLX}) ->
- Delivery = #delivery{message = #basic_message{exchange_name = XName}} =
- rabbit_basic:delivery(
- false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
- MsgSeqNo),
- {ok, X} = rabbit_exchange:lookup(XName),
- Queues = rabbit_exchange:route(X, Delivery),
- {Queues1, Cycles} = detect_dead_letter_cycles(Delivery, Queues),
- lists:foreach(fun log_cycle_once/1, Cycles),
- QPids = rabbit_amqqueue:lookup(Queues1),
- {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
- DeliveredQPids.
-
-dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
- State = #q{publish_seqno = MsgSeqNo,
- unconfirmed = UC}) ->
+dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo,
+ unconfirmed = UC}) ->
QPids = dead_letter_publish(Msg, Reason, State),
State1 = State#q{queue_monitors = pmon:monitor_all(
QPids, State#q.queue_monitors),
@@ -813,8 +814,7 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
false -> noreply(State1)
end.
-detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}},
- Queues) ->
+detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
#content{properties = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
NoCycles = {Queues, []},
@@ -841,31 +841,31 @@ detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}}
end
end.
-make_dead_letter_msg(DLX, Reason,
+make_dead_letter_msg(Reason,
Msg = #basic_message{content = Content,
exchange_name = Exchange,
routing_keys = RoutingKeys},
- State = #q{dlx_routing_key = DlxRoutingKey}) ->
+ State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) ->
{DeathRoutingKeys, HeadersFun1} =
case DlxRoutingKey of
undefined -> {RoutingKeys, fun (H) -> H end};
_ -> {[DlxRoutingKey],
fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
end,
+ ReasonBin = list_to_binary(atom_to_list(Reason)),
#resource{name = QName} = qname(State),
+ TimeSec = rabbit_misc:now_ms() div 1000,
HeadersFun2 =
fun (Headers) ->
%% The first routing key is the one specified in the
%% basic.publish; all others are CC or BCC keys.
- RoutingKeys1 =
- [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
- Info = [{<<"reason">>,
- longstr, list_to_binary(atom_to_list(Reason))},
- {<<"queue">>, longstr, QName},
- {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000},
- {<<"exchange">>, longstr, Exchange#resource.name},
- {<<"routing-keys">>, array,
- [{longstr, Key} || Key <- RoutingKeys1]}],
+ RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
+ RKs1 = [{longstr, Key} || Key <- RKs],
+ Info = [{<<"reason">>, longstr, ReasonBin},
+ {<<"queue">>, longstr, QName},
+ {<<"time">>, timestamp, TimeSec},
+ {<<"exchange">>, longstr, Exchange#resource.name},
+ {<<"routing-keys">>, array, RKs1}],
HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>,
Info, Headers))
end,
@@ -1240,11 +1240,13 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
ChPid, AckTags, State,
case Requeue of
true -> fun (State1) -> requeue_and_run(AckTags, State1) end;
- false -> Fun = dead_letter_fun(rejected, State),
- fun (State1 = #q{backing_queue = BQ,
+ false -> fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
+ Fun = dead_letter_fun(rejected, State1),
BQS1 = BQ:fold(Fun, BQS, AckTags),
- State1#q{backing_queue_state = BQS1}
+ ack_if_no_dlx(
+ AckTags,
+ State1#q{backing_queue_state = BQS1})
end
end));
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 8ad59016..17d848da 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -63,7 +63,7 @@
-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()).
--spec(map_headers/2 :: (rabbit_types:content(), fun((headers()) -> headers()))
+-spec(map_headers/2 :: (fun((headers()) -> headers()), rabbit_types:content())
-> rabbit_types:content()).
-spec(header_routes/1 ::
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 846890a1..22c6a223 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -36,9 +36,9 @@
conn_name, limiter, tx_status, next_tag, unacked_message_q,
uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
virtual_host, most_recently_declared_queue, queue_monitors,
- consumer_mapping, blocking, queue_consumers, queue_collector_pid,
- stats_timer, confirm_enabled, publish_seqno, unconfirmed,
- confirmed, capabilities, trace_state}).
+ consumer_mapping, blocking, queue_consumers, delivering_queues,
+ queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
+ unconfirmed, confirmed, capabilities, trace_state}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -198,6 +198,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
consumer_mapping = dict:new(),
blocking = sets:new(),
queue_consumers = dict:new(),
+ delivering_queues = sets:new(),
queue_collector_pid = CollectorPid,
confirm_enabled = false,
publish_seqno = 1,
@@ -331,10 +332,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
State2 = queue_blocked(QPid, State1),
State3 = handle_consuming_queue_down(QPid, State2),
+ State4 = handle_delivering_queue_down(QPid, State3),
credit_flow:peer_down(QPid),
erase_queue_stats(QPid),
noreply(State3#ch{queue_monitors = pmon:erase(
- QPid, State3#ch.queue_monitors)});
+ QPid, State4#ch.queue_monitors)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -657,7 +659,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, _QPid, _MsgId, Redelivered,
+ Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}}} ->
@@ -669,7 +671,8 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, record_sent(none, not(NoAck), Msg, State)};
+ State1 = monitor_delivering_queue(NoAck, QPid, State),
+ {noreply, record_sent(none, not(NoAck), Msg, State1)};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -707,10 +710,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ActualConsumerTag})),
Q}
end) of
- {ok, Q} ->
- State1 = State#ch{consumer_mapping =
- dict:store(ActualConsumerTag, Q,
- ConsumerMapping)},
+ {ok, Q = #amqqueue{pid = QPid}} ->
+ CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
+ State1 = monitor_delivering_queue(
+ NoAck, QPid, State#ch{consumer_mapping = CM1}),
{noreply,
case NoWait of
true -> consumer_monitor(ActualConsumerTag, State1);
@@ -1108,6 +1111,13 @@ consumer_monitor(ConsumerTag,
State
end.
+monitor_delivering_queue(true, _QPid, State) ->
+ State;
+monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons,
+ delivering_queues = DQ}) ->
+ State#ch{queue_monitors = pmon:monitor(QPid, QMons),
+ delivering_queues = sets:add_element(QPid, DQ)}.
+
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
case rabbit_misc:is_abnormal_termination(Reason) of
true -> {MXs, UC1} = dtree:take_all(QPid, UC),
@@ -1134,6 +1144,9 @@ handle_consuming_queue_down(QPid,
State#ch{consumer_mapping = ConsumerMapping1,
queue_consumers = dict:erase(QPid, QCons)}.
+handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
+ State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
+
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
@@ -1269,9 +1282,11 @@ new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
notify_queues(State = #ch{state = closing}) ->
{ok, State};
-notify_queues(State = #ch{consumer_mapping = Consumers}) ->
- {rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()),
- State#ch{state = closing}}.
+notify_queues(State = #ch{consumer_mapping = Consumers,
+ delivering_queues = DQ }) ->
+ QPids = sets:to_list(
+ sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
+ {rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}.
fold_per_queue(_F, Acc, []) ->
Acc;
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 51f88c8f..8b24d2e3 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -194,7 +194,11 @@ action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
action(wait, Node, [PidFile], _Opts, Inform) ->
Inform("Waiting for ~p", [Node]),
- wait_for_application(Node, PidFile, Inform);
+ wait_for_application(Node, PidFile, rabbit, Inform);
+
+action(wait, Node, [PidFile, App], _Opts, Inform) ->
+ Inform("Waiting for ~p on ~p", [App, Node]),
+ wait_for_application(Node, PidFile, list_to_atom(App), Inform);
action(status, Node, [], _Opts, Inform) ->
Inform("Status of node ~p", [Node]),
@@ -379,17 +383,17 @@ action(eval, Node, [Expr], _Opts, _Inform) ->
%%----------------------------------------------------------------------------
-wait_for_application(Node, PidFile, Inform) ->
+wait_for_application(Node, PidFile, Application, Inform) ->
Pid = read_pid_file(PidFile, true),
Inform("pid is ~s", [Pid]),
- wait_for_application(Node, Pid).
+ wait_for_application(Node, Pid, Application).
-wait_for_application(Node, Pid) ->
+wait_for_application(Node, Pid, Application) ->
case process_up(Pid) of
- true -> case rabbit:is_running(Node) of
+ true -> case rabbit_nodes:is_running(Node, Application) of
true -> ok;
false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL),
- wait_for_application(Node, Pid)
+ wait_for_application(Node, Pid, Application)
end;
false -> {error, process_not_running}
end.
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 2d155d14..17e2ffb4 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -356,7 +356,7 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }).
-handle_info(send_gm_heartbeat, State = #state{gm = GM}) ->
+handle_info(send_gm_heartbeat, State = #state { gm = GM }) ->
gm:broadcast(GM, heartbeat),
ensure_gm_heartbeat(),
noreply(State);
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 9a972d9e..b6a9e263 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -16,7 +16,7 @@
-module(rabbit_nodes).
--export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0]).
+-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, is_running/2]).
-define(EPMD_TIMEOUT, 30000).
@@ -32,6 +32,7 @@
-spec(make/1 :: ({string(), string()} | string()) -> node()).
-spec(parts/1 :: (node() | string()) -> {string(), string()}).
-spec(cookie_hash/0 :: () -> string()).
+-spec(is_running/2 :: (node(), atom()) -> boolean()).
-endif.
@@ -92,3 +93,9 @@ parts(NodeStr) ->
cookie_hash() ->
base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))).
+
+is_running(Node, Application) ->
+ case rpc:call(Node, application, which_applications, [infinity]) of
+ {badrpc, _} -> false;
+ Apps -> proplists:is_defined(Application, Apps)
+ end.