summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2011-03-11 11:25:32 +0000
committerRob Harrop <rob@rabbitmq.com>2011-03-11 11:25:32 +0000
commit0f7d856796c0414368f004a854fcfa87a29bbb6c (patch)
tree48e0c8c3c4c4f4c5196ff51032b65caa8129a551
parent35fdfe2b60ecb3cc2b096dada42274dd90052002 (diff)
parent36f2b7b90445999485e57f5b7b3fec334a72c96a (diff)
downloadrabbitmq-server-0f7d856796c0414368f004a854fcfa87a29bbb6c.tar.gz
Merge with default
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--src/file_handle_cache.erl42
-rw-r--r--src/rabbit_amqqueue_process.erl33
-rw-r--r--src/rabbit_channel.erl168
-rw-r--r--src/rabbit_reader.erl19
5 files changed, 149 insertions, 114 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index f837684c..014c18b0 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -20,6 +20,7 @@
{vm_memory_high_watermark, 0.4},
{msg_store_index_module, rabbit_msg_store_ets_index},
{backing_queue_module, rabbit_variable_queue},
+ {frame_max, 131072},
{persister_max_wrap_entries, 500},
{persister_hibernate_after, 10000},
{msg_store_file_size_limit, 16777216},
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 6f8241b3..b26bb988 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -156,13 +156,6 @@
-define(SERVER, ?MODULE).
-define(RESERVED_FOR_OTHERS, 100).
-%% Googling around suggests that Windows has a limit somewhere around
-%% 16M, eg
-%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx
-%% however, it turns out that's only available through the win32
-%% API. Via the C Runtime, we have just 512:
-%% http://msdn.microsoft.com/en-us/library/6e3b887c%28VS.80%29.aspx
--define(FILE_HANDLES_LIMIT_WINDOWS, 512).
-define(FILE_HANDLES_LIMIT_OTHER, 1024).
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
@@ -1185,29 +1178,20 @@ track_client(Pid, Clients) ->
false -> ok
end.
-%% For all unices, assume ulimit exists. Further googling suggests
-%% that BSDs (incl OS X), solaris and linux all agree that ulimit -n
-%% is file handles
+
+%% To increase the number of file descriptors: on Windows set ERL_MAX_PORTS
+%% environment variable, on Linux set `ulimit -n`.
ulimit() ->
- case os:type() of
- {win32, _OsName} ->
- ?FILE_HANDLES_LIMIT_WINDOWS;
- {unix, _OsName} ->
- %% Under Linux, Solaris and FreeBSD, ulimit is a shell
- %% builtin, not a command. In OS X and AIX it's a command.
- %% Fortunately, os:cmd invokes the cmd in a shell env, so
- %% we're safe in all cases.
- case os:cmd("ulimit -n") of
- "unlimited" ->
- infinity;
- String = [C|_] when $0 =< C andalso C =< $9 ->
- list_to_integer(
- lists:takewhile(
- fun (D) -> $0 =< D andalso D =< $9 end, String));
- _ ->
- %% probably a variant of
- %% "/bin/sh: line 1: ulimit: command not found\n"
- unknown
+ case proplists:get_value(max_fds, erlang:system_info(check_io)) of
+ MaxFds when is_integer(MaxFds) andalso MaxFds > 1 ->
+ case os:type() of
+ {win32, _OsName} ->
+ %% On Windows max_fds is twice the number of open files:
+ %% https://github.com/yrashk/erlang/blob/e1282325ed75e52a98d5/erts/emulator/sys/win32/sys.c#L2459-2466
+ MaxFds div 2;
+ _Any ->
+ %% For other operating systems trust Erlang.
+ MaxFds
end;
_ ->
unknown
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b32fa0ff..54c92dc7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -422,19 +422,18 @@ gb_trees_cons(Key, Value, Tree) ->
end.
record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
- {no_confirm, State};
+ {never, State};
record_confirm_message(#delivery{sender = ChPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
is_persistent = true,
id = MsgId}},
- State =
- #q{msg_id_to_channel = MTC,
- q = #amqqueue{durable = true}}) ->
- {confirm,
+ State = #q{q = #amqqueue{durable = true},
+ msg_id_to_channel = MTC}) ->
+ {eventually,
State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)}};
record_confirm_message(_Delivery, State) ->
- {no_confirm, State}.
+ {immediately, State}.
run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
@@ -450,11 +449,9 @@ attempt_delivery(#delivery{txn = none,
message = Message,
msg_seq_no = MsgSeqNo},
{NeedsConfirming, State = #q{backing_queue = BQ}}) ->
- %% must confirm immediately if it has a MsgSeqNo and not NeedsConfirming
- case {NeedsConfirming, MsgSeqNo} of
- {_, undefined} -> ok;
- {no_confirm, _} -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
- {confirm, _} -> ok
+ case NeedsConfirming of
+ immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
+ _ -> ok
end,
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
@@ -466,7 +463,7 @@ attempt_delivery(#delivery{txn = none,
BQ:publish_delivered(
AckRequired, Message,
(?BASE_MESSAGE_PROPERTIES)#message_properties{
- needs_confirming = (NeedsConfirming =:= confirm)},
+ needs_confirming = (NeedsConfirming =:= eventually)},
BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
@@ -486,16 +483,16 @@ attempt_delivery(#delivery{txn = Txn,
deliver_or_enqueue(Delivery, State) ->
case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
{true, _, State1} ->
- {true, State1};
+ State1;
{false, NeedsConfirming, State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}} ->
#delivery{message = Message} = Delivery,
BQS1 = BQ:publish(Message,
(message_properties(State)) #message_properties{
needs_confirming =
- (NeedsConfirming =:= confirm)},
+ (NeedsConfirming =:= eventually)},
BQS),
- {false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})}
+ ensure_ttl_timer(State1#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
@@ -825,8 +822,7 @@ handle_call({deliver_immediately, Delivery}, _From, State) ->
handle_call({deliver, Delivery}, From, State) ->
%% Synchronous, "mandatory" delivery mode. Reply asap.
gen_server2:reply(From, true),
- {_Delivered, NewState} = deliver_or_enqueue(Delivery, State),
- noreply(NewState);
+ noreply(deliver_or_enqueue(Delivery, State));
handle_call({commit, Txn, ChPid}, From, State) ->
case lookup_ch(ChPid) of
@@ -988,8 +984,7 @@ handle_cast(sync_timeout, State) ->
handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- {_Delivered, NewState} = deliver_or_enqueue(Delivery, State),
- noreply(NewState);
+ noreply(deliver_or_enqueue(Delivery, State));
handle_cast({ack, Txn, AckTags, ChPid},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f584ff32..da103284 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -33,9 +33,9 @@
start_limiter_fun, transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm,
- confirmed, capabilities}).
+ consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
+ stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
+ unconfirmed_qm, confirmed, capabilities}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -176,6 +176,7 @@ init([Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
blocking = dict:new(),
+ consumer_monitors = dict:new(),
queue_collector_pid = CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
@@ -247,6 +248,11 @@ handle_cast(ready_for_close, State = #ch{state = closing,
handle_cast(terminate, State) ->
{stop, normal, State};
+handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg},
+ State = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(WriterPid, Msg),
+ noreply(monitor_consumer(ConsumerTag, State));
+
handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
noreply(State);
@@ -288,24 +294,15 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
handle_info(timeout, State) ->
noreply(State);
-handle_info({'DOWN', _MRef, process, QPid, Reason},
- State = #ch{unconfirmed_qm = UQM}) ->
- MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
- {value, MsgSet} -> gb_sets:to_list(MsgSet);
- none -> []
- end,
- %% We remove the MsgSeqNos from UQM before calling
- %% process_confirms to prevent each MsgSeqNo being removed from
- %% the set one by one which which would be inefficient
- State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
- {Nack, SendFun} = case Reason of
- normal -> {false, fun record_confirms/2};
- _ -> {true, fun send_nacks/2}
- end,
- {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
- erase_queue_stats(QPid),
- State3 = SendFun(MXs, State2),
- noreply(queue_blocked(QPid, State3)).
+handle_info({'DOWN', MRef, process, QPid, Reason},
+ State = #ch{consumer_monitors = ConsumerMonitors}) ->
+ noreply(
+ case dict:find(MRef, ConsumerMonitors) of
+ error ->
+ handle_publishing_queue_down(QPid, Reason, State);
+ {ok, ConsumerTag} ->
+ handle_consuming_queue_down(MRef, ConsumerTag, State)
+ end).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -692,9 +689,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait},
- _, State = #ch{reader_pid = ReaderPid,
- limiter_pid = LimiterPid,
- consumer_mapping = ConsumerMapping }) ->
+ _, State = #ch{reader_pid = ReaderPid,
+ limiter_pid = LimiterPid,
+ consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
@@ -711,18 +708,24 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) ->
- rabbit_amqqueue:basic_consume(
- Q, NoAck, self(), LimiterPid,
- ActualConsumerTag, ExclusiveConsume,
- ok_msg(NoWait, #'basic.consume_ok'{
- consumer_tag = ActualConsumerTag}))
+ {rabbit_amqqueue:basic_consume(
+ Q, NoAck, self(), LimiterPid,
+ ActualConsumerTag, ExclusiveConsume,
+ ok_msg(NoWait, #'basic.consume_ok'{
+ consumer_tag = ActualConsumerTag})),
+ Q}
end) of
- ok ->
- {noreply, State#ch{consumer_mapping =
- dict:store(ActualConsumerTag,
- QueueName,
- ConsumerMapping)}};
- {error, exclusive_consume_unavailable} ->
+ {ok, Q} ->
+ State1 = State#ch{consumer_mapping =
+ dict:store(ActualConsumerTag,
+ {Q, undefined},
+ ConsumerMapping)},
+ {noreply,
+ case NoWait of
+ true -> monitor_consumer(ActualConsumerTag, State1);
+ false -> State1
+ end};
+ {{error, exclusive_consume_unavailable}, _Q} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
[rabbit_misc:rs(QueueName)])
@@ -735,26 +738,31 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
nowait = NoWait},
- _, State = #ch{consumer_mapping = ConsumerMapping }) ->
+ _, State = #ch{consumer_mapping = ConsumerMapping,
+ consumer_monitors = ConsumerMonitors}) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
%% Spec requires we ignore this situation.
return_ok(State, NoWait, OkMsg);
- {ok, QueueName} ->
- NewState = State#ch{consumer_mapping =
- dict:erase(ConsumerTag,
- ConsumerMapping)},
- case rabbit_amqqueue:with(
- QueueName,
- fun (Q) ->
- %% In order to ensure that no more messages
- %% are sent to the consumer after the
- %% cancel_ok has been sent, we get the
- %% queue process to send the cancel_ok on
- %% our behalf. If we were sending the
- %% cancel_ok ourselves it might overtake a
- %% message sent previously by the queue.
+ {ok, {Q, MRef}} ->
+ ConsumerMonitors1 =
+ case MRef of
+ undefined -> ConsumerMonitors;
+ _ -> true = erlang:demonitor(MRef),
+ dict:erase(MRef, ConsumerMonitors)
+ end,
+ NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag,
+ ConsumerMapping),
+ consumer_monitors = ConsumerMonitors1},
+ %% In order to ensure that no more messages are sent to
+ %% the consumer after the cancel_ok has been sent, we get
+ %% the queue process to send the cancel_ok on our
+ %% behalf. If we were sending the cancel_ok ourselves it
+ %% might overtake a message sent previously by the queue.
+ case rabbit_misc:with_exit_handler(
+ fun () -> {error, not_found} end,
+ fun () ->
rabbit_amqqueue:basic_cancel(
Q, self(), ConsumerTag,
ok_msg(NoWait, #'basic.cancel_ok'{
@@ -1084,6 +1092,53 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
+monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping,
+ consumer_monitors = ConsumerMonitors,
+ capabilities = Capabilities}) ->
+ case rabbit_misc:table_lookup(
+ Capabilities, <<"consumer_cancel_notify">>) of
+ {bool, true} ->
+ {#amqqueue{pid = QPid} = Q, undefined} =
+ dict:fetch(ConsumerTag, ConsumerMapping),
+ MRef = erlang:monitor(process, QPid),
+ State#ch{consumer_mapping =
+ dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping),
+ consumer_monitors =
+ dict:store(MRef, ConsumerTag, ConsumerMonitors)};
+ _ ->
+ State
+ end.
+
+handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
+ MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSet} -> gb_sets:to_list(MsgSet);
+ none -> []
+ end,
+ %% We remove the MsgSeqNos from UQM before calling
+ %% process_confirms to prevent each MsgSeqNo being removed from
+ %% the set one by one which which would be inefficient
+ State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
+ {Nack, SendFun} = case Reason of
+ normal -> {false, fun record_confirms/2};
+ _ -> {true, fun send_nacks/2}
+ end,
+ {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
+ erase_queue_stats(QPid),
+ State3 = SendFun(MXs, State2),
+ queue_blocked(QPid, State3).
+
+handle_consuming_queue_down(MRef, ConsumerTag,
+ State = #ch{consumer_mapping = ConsumerMapping,
+ consumer_monitors = ConsumerMonitors,
+ writer_pid = WriterPid}) ->
+ ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping),
+ ConsumerMonitors1 = dict:erase(MRef, ConsumerMonitors),
+ Cancel = #'basic.cancel'{consumer_tag = ConsumerTag,
+ nowait = true},
+ ok = rabbit_writer:send_command(WriterPid, Cancel),
+ State#ch{consumer_mapping = ConsumerMapping1,
+ consumer_monitors = ConsumerMonitors1}.
+
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
@@ -1254,16 +1309,9 @@ limit_queues(LPid, #ch{consumer_mapping = Consumers}) ->
rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid).
consumer_queues(Consumers) ->
- [QPid || QueueName <-
- sets:to_list(
- dict:fold(fun (_ConsumerTag, QueueName, S) ->
- sets:add_element(QueueName, S)
- end, sets:new(), Consumers)),
- case rabbit_amqqueue:lookup(QueueName) of
- {ok, Q} -> QPid = Q#amqqueue.pid, true;
- %% queue has been deleted in the meantime
- {error, not_found} -> QPid = none, false
- end].
+ lists:usort([QPid ||
+ {_Key, {#amqqueue{pid = QPid}, _MRef}}
+ <- dict:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
%% for messages delivered to subscribed consumers, but not acks for
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 710e6878..5afe5560 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -35,7 +35,6 @@
-define(CLOSING_TIMEOUT, 1).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
-define(SILENT_CLOSE_DELAY, 3).
--define(FRAME_MAX, 131072). %% set to zero once QPid fix their negotiation
%%--------------------------------------------------------------------------
@@ -165,7 +164,8 @@ server_properties(Protocol) ->
server_capabilities(rabbit_framing_amqp_0_9_1) ->
[{<<"publisher_confirms">>, bool, true},
{<<"exchange_exchange_bindings">>, bool, true},
- {<<"basic.nack">>, bool, true}];
+ {<<"basic.nack">>, bool, true},
+ {<<"consumer_cancel_notify">>, bool, true}];
server_capabilities(_) ->
[].
@@ -641,14 +641,15 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
connection = Connection,
sock = Sock,
start_heartbeat_fun = SHF}) ->
- if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) ->
+ ServerFrameMax = server_frame_max(),
+ if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE ->
rabbit_misc:protocol_error(
not_allowed, "frame_max=~w < ~w min size",
[FrameMax, ?FRAME_MIN_SIZE]);
- (?FRAME_MAX /= 0) and (FrameMax > ?FRAME_MAX) ->
+ ServerFrameMax /= 0 andalso FrameMax > ServerFrameMax ->
rabbit_misc:protocol_error(
not_allowed, "frame_max=~w > ~w max size",
- [FrameMax, ?FRAME_MAX]);
+ [FrameMax, ServerFrameMax]);
true ->
Frame = rabbit_binary_generator:build_heartbeat_frame(),
SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
@@ -706,6 +707,12 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
+%% Compute frame_max for this instance. Could simply use 0, but breaks
+%% QPid Java client.
+server_frame_max() ->
+ {ok, FrameMax} = application:get_env(rabbit, frame_max),
+ FrameMax.
+
send_on_channel0(Sock, Method, Protocol) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
@@ -756,7 +763,7 @@ auth_phase(Response,
State#v1{auth_state = AuthState1};
{ok, User} ->
Tune = #'connection.tune'{channel_max = 0,
- frame_max = ?FRAME_MAX,
+ frame_max = server_frame_max(),
heartbeat = 0},
ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,