diff options
author | Rob Harrop <rob@rabbitmq.com> | 2011-03-11 11:25:32 +0000 |
---|---|---|
committer | Rob Harrop <rob@rabbitmq.com> | 2011-03-11 11:25:32 +0000 |
commit | 0f7d856796c0414368f004a854fcfa87a29bbb6c (patch) | |
tree | 48e0c8c3c4c4f4c5196ff51032b65caa8129a551 | |
parent | 35fdfe2b60ecb3cc2b096dada42274dd90052002 (diff) | |
parent | 36f2b7b90445999485e57f5b7b3fec334a72c96a (diff) | |
download | rabbitmq-server-0f7d856796c0414368f004a854fcfa87a29bbb6c.tar.gz |
Merge with default
-rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
-rw-r--r-- | src/file_handle_cache.erl | 42 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 33 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 168 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 19 |
5 files changed, 149 insertions, 114 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index f837684c..014c18b0 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -20,6 +20,7 @@ {vm_memory_high_watermark, 0.4}, {msg_store_index_module, rabbit_msg_store_ets_index}, {backing_queue_module, rabbit_variable_queue}, + {frame_max, 131072}, {persister_max_wrap_entries, 500}, {persister_hibernate_after, 10000}, {msg_store_file_size_limit, 16777216}, diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 6f8241b3..b26bb988 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -156,13 +156,6 @@ -define(SERVER, ?MODULE). -define(RESERVED_FOR_OTHERS, 100). -%% Googling around suggests that Windows has a limit somewhere around -%% 16M, eg -%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx -%% however, it turns out that's only available through the win32 -%% API. Via the C Runtime, we have just 512: -%% http://msdn.microsoft.com/en-us/library/6e3b887c%28VS.80%29.aspx --define(FILE_HANDLES_LIMIT_WINDOWS, 512). -define(FILE_HANDLES_LIMIT_OTHER, 1024). -define(FILE_HANDLES_CHECK_INTERVAL, 2000). @@ -1185,29 +1178,20 @@ track_client(Pid, Clients) -> false -> ok end. -%% For all unices, assume ulimit exists. Further googling suggests -%% that BSDs (incl OS X), solaris and linux all agree that ulimit -n -%% is file handles + +%% To increase the number of file descriptors: on Windows set ERL_MAX_PORTS +%% environment variable, on Linux set `ulimit -n`. ulimit() -> - case os:type() of - {win32, _OsName} -> - ?FILE_HANDLES_LIMIT_WINDOWS; - {unix, _OsName} -> - %% Under Linux, Solaris and FreeBSD, ulimit is a shell - %% builtin, not a command. In OS X and AIX it's a command. - %% Fortunately, os:cmd invokes the cmd in a shell env, so - %% we're safe in all cases. - case os:cmd("ulimit -n") of - "unlimited" -> - infinity; - String = [C|_] when $0 =< C andalso C =< $9 -> - list_to_integer( - lists:takewhile( - fun (D) -> $0 =< D andalso D =< $9 end, String)); - _ -> - %% probably a variant of - %% "/bin/sh: line 1: ulimit: command not found\n" - unknown + case proplists:get_value(max_fds, erlang:system_info(check_io)) of + MaxFds when is_integer(MaxFds) andalso MaxFds > 1 -> + case os:type() of + {win32, _OsName} -> + %% On Windows max_fds is twice the number of open files: + %% https://github.com/yrashk/erlang/blob/e1282325ed75e52a98d5/erts/emulator/sys/win32/sys.c#L2459-2466 + MaxFds div 2; + _Any -> + %% For other operating systems trust Erlang. + MaxFds end; _ -> unknown diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b32fa0ff..54c92dc7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -422,19 +422,18 @@ gb_trees_cons(Key, Value, Tree) -> end. record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> - {no_confirm, State}; + {never, State}; record_confirm_message(#delivery{sender = ChPid, msg_seq_no = MsgSeqNo, message = #basic_message { is_persistent = true, id = MsgId}}, - State = - #q{msg_id_to_channel = MTC, - q = #amqqueue{durable = true}}) -> - {confirm, + State = #q{q = #amqqueue{durable = true}, + msg_id_to_channel = MTC}) -> + {eventually, State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)}}; record_confirm_message(_Delivery, State) -> - {no_confirm, State}. + {immediately, State}. run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, @@ -450,11 +449,9 @@ attempt_delivery(#delivery{txn = none, message = Message, msg_seq_no = MsgSeqNo}, {NeedsConfirming, State = #q{backing_queue = BQ}}) -> - %% must confirm immediately if it has a MsgSeqNo and not NeedsConfirming - case {NeedsConfirming, MsgSeqNo} of - {_, undefined} -> ok; - {no_confirm, _} -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); - {confirm, _} -> ok + case NeedsConfirming of + immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); + _ -> ok end, PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = @@ -466,7 +463,7 @@ attempt_delivery(#delivery{txn = none, BQ:publish_delivered( AckRequired, Message, (?BASE_MESSAGE_PROPERTIES)#message_properties{ - needs_confirming = (NeedsConfirming =:= confirm)}, + needs_confirming = (NeedsConfirming =:= eventually)}, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} @@ -486,16 +483,16 @@ attempt_delivery(#delivery{txn = Txn, deliver_or_enqueue(Delivery, State) -> case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of {true, _, State1} -> - {true, State1}; + State1; {false, NeedsConfirming, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> #delivery{message = Message} = Delivery, BQS1 = BQ:publish(Message, (message_properties(State)) #message_properties{ needs_confirming = - (NeedsConfirming =:= confirm)}, + (NeedsConfirming =:= eventually)}, BQS), - {false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})} + ensure_ttl_timer(State1#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> @@ -825,8 +822,7 @@ handle_call({deliver_immediately, Delivery}, _From, State) -> handle_call({deliver, Delivery}, From, State) -> %% Synchronous, "mandatory" delivery mode. Reply asap. gen_server2:reply(From, true), - {_Delivered, NewState} = deliver_or_enqueue(Delivery, State), - noreply(NewState); + noreply(deliver_or_enqueue(Delivery, State)); handle_call({commit, Txn, ChPid}, From, State) -> case lookup_ch(ChPid) of @@ -988,8 +984,7 @@ handle_cast(sync_timeout, State) -> handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - {_Delivered, NewState} = deliver_or_enqueue(Delivery, State), - noreply(NewState); + noreply(deliver_or_enqueue(Delivery, State)); handle_cast({ack, Txn, AckTags, ChPid}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f584ff32..da103284 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -33,9 +33,9 @@ start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm, - confirmed, capabilities}). + consumer_mapping, blocking, consumer_monitors, queue_collector_pid, + stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, + unconfirmed_qm, confirmed, capabilities}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -176,6 +176,7 @@ init([Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities, most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), blocking = dict:new(), + consumer_monitors = dict:new(), queue_collector_pid = CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, @@ -247,6 +248,11 @@ handle_cast(ready_for_close, State = #ch{state = closing, handle_cast(terminate, State) -> {stop, normal, State}; +handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg}, + State = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command(WriterPid, Msg), + noreply(monitor_consumer(ConsumerTag, State)); + handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), noreply(State); @@ -288,24 +294,15 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> handle_info(timeout, State) -> noreply(State); -handle_info({'DOWN', _MRef, process, QPid, Reason}, - State = #ch{unconfirmed_qm = UQM}) -> - MsgSeqNos = case gb_trees:lookup(QPid, UQM) of - {value, MsgSet} -> gb_sets:to_list(MsgSet); - none -> [] - end, - %% We remove the MsgSeqNos from UQM before calling - %% process_confirms to prevent each MsgSeqNo being removed from - %% the set one by one which which would be inefficient - State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, - {Nack, SendFun} = case Reason of - normal -> {false, fun record_confirms/2}; - _ -> {true, fun send_nacks/2} - end, - {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), - erase_queue_stats(QPid), - State3 = SendFun(MXs, State2), - noreply(queue_blocked(QPid, State3)). +handle_info({'DOWN', MRef, process, QPid, Reason}, + State = #ch{consumer_monitors = ConsumerMonitors}) -> + noreply( + case dict:find(MRef, ConsumerMonitors) of + error -> + handle_publishing_queue_down(QPid, Reason, State); + {ok, ConsumerTag} -> + handle_consuming_queue_down(MRef, ConsumerTag, State) + end). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), @@ -692,9 +689,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_ack = NoAck, exclusive = ExclusiveConsume, nowait = NoWait}, - _, State = #ch{reader_pid = ReaderPid, - limiter_pid = LimiterPid, - consumer_mapping = ConsumerMapping }) -> + _, State = #ch{reader_pid = ReaderPid, + limiter_pid = LimiterPid, + consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), @@ -711,18 +708,24 @@ handle_method(#'basic.consume'{queue = QueueNameBin, case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> - rabbit_amqqueue:basic_consume( - Q, NoAck, self(), LimiterPid, - ActualConsumerTag, ExclusiveConsume, - ok_msg(NoWait, #'basic.consume_ok'{ - consumer_tag = ActualConsumerTag})) + {rabbit_amqqueue:basic_consume( + Q, NoAck, self(), LimiterPid, + ActualConsumerTag, ExclusiveConsume, + ok_msg(NoWait, #'basic.consume_ok'{ + consumer_tag = ActualConsumerTag})), + Q} end) of - ok -> - {noreply, State#ch{consumer_mapping = - dict:store(ActualConsumerTag, - QueueName, - ConsumerMapping)}}; - {error, exclusive_consume_unavailable} -> + {ok, Q} -> + State1 = State#ch{consumer_mapping = + dict:store(ActualConsumerTag, + {Q, undefined}, + ConsumerMapping)}, + {noreply, + case NoWait of + true -> monitor_consumer(ActualConsumerTag, State1); + false -> State1 + end}; + {{error, exclusive_consume_unavailable}, _Q} -> rabbit_misc:protocol_error( access_refused, "~s in exclusive use", [rabbit_misc:rs(QueueName)]) @@ -735,26 +738,31 @@ handle_method(#'basic.consume'{queue = QueueNameBin, handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, - _, State = #ch{consumer_mapping = ConsumerMapping }) -> + _, State = #ch{consumer_mapping = ConsumerMapping, + consumer_monitors = ConsumerMonitors}) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, case dict:find(ConsumerTag, ConsumerMapping) of error -> %% Spec requires we ignore this situation. return_ok(State, NoWait, OkMsg); - {ok, QueueName} -> - NewState = State#ch{consumer_mapping = - dict:erase(ConsumerTag, - ConsumerMapping)}, - case rabbit_amqqueue:with( - QueueName, - fun (Q) -> - %% In order to ensure that no more messages - %% are sent to the consumer after the - %% cancel_ok has been sent, we get the - %% queue process to send the cancel_ok on - %% our behalf. If we were sending the - %% cancel_ok ourselves it might overtake a - %% message sent previously by the queue. + {ok, {Q, MRef}} -> + ConsumerMonitors1 = + case MRef of + undefined -> ConsumerMonitors; + _ -> true = erlang:demonitor(MRef), + dict:erase(MRef, ConsumerMonitors) + end, + NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag, + ConsumerMapping), + consumer_monitors = ConsumerMonitors1}, + %% In order to ensure that no more messages are sent to + %% the consumer after the cancel_ok has been sent, we get + %% the queue process to send the cancel_ok on our + %% behalf. If we were sending the cancel_ok ourselves it + %% might overtake a message sent previously by the queue. + case rabbit_misc:with_exit_handler( + fun () -> {error, not_found} end, + fun () -> rabbit_amqqueue:basic_cancel( Q, self(), ConsumerTag, ok_msg(NoWait, #'basic.cancel_ok'{ @@ -1084,6 +1092,53 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- +monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, + consumer_monitors = ConsumerMonitors, + capabilities = Capabilities}) -> + case rabbit_misc:table_lookup( + Capabilities, <<"consumer_cancel_notify">>) of + {bool, true} -> + {#amqqueue{pid = QPid} = Q, undefined} = + dict:fetch(ConsumerTag, ConsumerMapping), + MRef = erlang:monitor(process, QPid), + State#ch{consumer_mapping = + dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping), + consumer_monitors = + dict:store(MRef, ConsumerTag, ConsumerMonitors)}; + _ -> + State + end. + +handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> + MsgSeqNos = case gb_trees:lookup(QPid, UQM) of + {value, MsgSet} -> gb_sets:to_list(MsgSet); + none -> [] + end, + %% We remove the MsgSeqNos from UQM before calling + %% process_confirms to prevent each MsgSeqNo being removed from + %% the set one by one which which would be inefficient + State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, + {Nack, SendFun} = case Reason of + normal -> {false, fun record_confirms/2}; + _ -> {true, fun send_nacks/2} + end, + {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), + erase_queue_stats(QPid), + State3 = SendFun(MXs, State2), + queue_blocked(QPid, State3). + +handle_consuming_queue_down(MRef, ConsumerTag, + State = #ch{consumer_mapping = ConsumerMapping, + consumer_monitors = ConsumerMonitors, + writer_pid = WriterPid}) -> + ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping), + ConsumerMonitors1 = dict:erase(MRef, ConsumerMonitors), + Cancel = #'basic.cancel'{consumer_tag = ConsumerTag, + nowait = true}, + ok = rabbit_writer:send_command(WriterPid, Cancel), + State#ch{consumer_mapping = ConsumerMapping1, + consumer_monitors = ConsumerMonitors1}. + binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, @@ -1254,16 +1309,9 @@ limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). consumer_queues(Consumers) -> - [QPid || QueueName <- - sets:to_list( - dict:fold(fun (_ConsumerTag, QueueName, S) -> - sets:add_element(QueueName, S) - end, sets:new(), Consumers)), - case rabbit_amqqueue:lookup(QueueName) of - {ok, Q} -> QPid = Q#amqqueue.pid, true; - %% queue has been deleted in the meantime - {error, not_found} -> QPid = none, false - end]. + lists:usort([QPid || + {_Key, {#amqqueue{pid = QPid}, _MRef}} + <- dict:to_list(Consumers)]). %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, but not acks for diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 710e6878..5afe5560 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -35,7 +35,6 @@ -define(CLOSING_TIMEOUT, 1). -define(CHANNEL_TERMINATION_TIMEOUT, 3). -define(SILENT_CLOSE_DELAY, 3). --define(FRAME_MAX, 131072). %% set to zero once QPid fix their negotiation %%-------------------------------------------------------------------------- @@ -165,7 +164,8 @@ server_properties(Protocol) -> server_capabilities(rabbit_framing_amqp_0_9_1) -> [{<<"publisher_confirms">>, bool, true}, {<<"exchange_exchange_bindings">>, bool, true}, - {<<"basic.nack">>, bool, true}]; + {<<"basic.nack">>, bool, true}, + {<<"consumer_cancel_notify">>, bool, true}]; server_capabilities(_) -> []. @@ -641,14 +641,15 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, connection = Connection, sock = Sock, start_heartbeat_fun = SHF}) -> - if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) -> + ServerFrameMax = server_frame_max(), + if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE -> rabbit_misc:protocol_error( not_allowed, "frame_max=~w < ~w min size", [FrameMax, ?FRAME_MIN_SIZE]); - (?FRAME_MAX /= 0) and (FrameMax > ?FRAME_MAX) -> + ServerFrameMax /= 0 andalso FrameMax > ServerFrameMax -> rabbit_misc:protocol_error( not_allowed, "frame_max=~w > ~w max size", - [FrameMax, ?FRAME_MAX]); + [FrameMax, ServerFrameMax]); true -> Frame = rabbit_binary_generator:build_heartbeat_frame(), SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, @@ -706,6 +707,12 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). +%% Compute frame_max for this instance. Could simply use 0, but breaks +%% QPid Java client. +server_frame_max() -> + {ok, FrameMax} = application:get_env(rabbit, frame_max), + FrameMax. + send_on_channel0(Sock, Method, Protocol) -> ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). @@ -756,7 +763,7 @@ auth_phase(Response, State#v1{auth_state = AuthState1}; {ok, User} -> Tune = #'connection.tune'{channel_max = 0, - frame_max = ?FRAME_MAX, + frame_max = server_frame_max(), heartbeat = 0}, ok = send_on_channel0(Sock, Tune, Protocol), State#v1{connection_state = tuning, |