diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-11 07:43:12 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-11 07:43:12 +0000 |
commit | 3fdbf7a7a77de951aeedc6a482c2f61bf23f969f (patch) | |
tree | e7bd15fe35d24f74d8221a2746b4e6c8beec1cfb | |
parent | f6072bd336243017915a5f778b98c3a7b6922c9c (diff) | |
parent | 7372c6f05bd26d0e480183429d09be44676e5338 (diff) | |
download | rabbitmq-server-3fdbf7a7a77de951aeedc6a482c2f61bf23f969f.tar.gz |
merge from default
-rw-r--r-- | docs/rabbitmqctl.1.xml | 20 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 77 | ||||
-rw-r--r-- | src/rabbit_auth_backend_internal.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 186 | ||||
-rw-r--r-- | src/rabbit_net.erl | 10 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 26 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 17 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 42 |
8 files changed, 207 insertions, 173 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 9df4c1a8..2152cab3 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1010,6 +1010,26 @@ connection is secured with SSL.</para></listitem> </varlistentry> <varlistentry> + <term>ssl_protocol</term> + <listitem><para>SSL protocol + (e.g. tlsv1)</para></listitem> + </varlistentry> + <varlistentry> + <term>ssl_key_exchange</term> + <listitem><para>SSL key exchange algorithm + (e.g. rsa)</para></listitem> + </varlistentry> + <varlistentry> + <term>ssl_cipher</term> + <listitem><para>SSL cipher algorithm + (e.g. aes_256_cbc)</para></listitem> + </varlistentry> + <varlistentry> + <term>ssl_hash</term> + <listitem><para>SSL hash function + (e.g. sha)</para></listitem> + </varlistentry> + <varlistentry> <term>peer_cert_subject</term> <listitem><para>The subject of the peer's SSL certificate, in RFC4514 form.</para></listitem> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e6e3989f..fde54346 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -425,18 +425,36 @@ deliver_from_queue_deliver(AckRequired, false, State) -> fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. -confirm_messages(Guids, State) -> - lists:foldl(fun confirm_message_by_guid/2, State, Guids). - -confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) -> - case dict:find(Guid, GTC) of - {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo); - _ -> ok +confirm_messages(Guids, State = #q{guid_to_channel = GTC}) -> + {CMs, GTC1} = + lists:foldl( + fun(Guid, {CMs, GTC0}) -> + case dict:find(Guid, GTC0) of + {ok, {ChPid, MsgSeqNo}} -> + {[{ChPid, MsgSeqNo} | CMs], dict:erase(Guid, GTC0)}; + _ -> + {CMs, GTC0} + end + end, {[], GTC}, Guids), + case lists:usort(CMs) of + [{Ch, MsgSeqNo} | CMs1] -> + [rabbit_channel:confirm(ChPid, MsgSeqNos) || + {ChPid, MsgSeqNos} <- group_confirms_by_channel( + CMs1, [{Ch, [MsgSeqNo]}])]; + [] -> + ok end, - State#q{guid_to_channel = dict:erase(Guid, GTC)}. + State#q{guid_to_channel = GTC1}. + +group_confirms_by_channel([], Acc) -> + Acc; +group_confirms_by_channel([{Ch, Msg1} | CMs], [{Ch, Msgs} | Acc]) -> + group_confirms_by_channel(CMs, [{Ch, [Msg1 | Msgs]} | Acc]); +group_confirms_by_channel([{Ch, Msg1} | CMs], Acc) -> + group_confirms_by_channel(CMs, [{Ch, [Msg1]} | Acc]). record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> - State; + {no_confirm, State}; record_confirm_message(#delivery{sender = ChPid, msg_seq_no = MsgSeqNo, message = #basic_message { @@ -445,9 +463,10 @@ record_confirm_message(#delivery{sender = ChPid, State = #q{guid_to_channel = GTC, q = #amqqueue{durable = true}}) -> - State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}; + {confirm, + State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}}; record_confirm_message(_Delivery, State) -> - State. + {no_confirm, State}. run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, @@ -462,12 +481,12 @@ attempt_delivery(#delivery{txn = none, sender = ChPid, message = Message, msg_seq_no = MsgSeqNo}, - State = #q{backing_queue = BQ, q = Q}) -> - NeedsConfirming = Message#basic_message.is_persistent andalso - Q#amqqueue.durable, - case NeedsConfirming of - false -> rabbit_channel:confirm(ChPid, MsgSeqNo); - _ -> ok + {NeedsConfirming, State = #q{backing_queue = BQ}}) -> + %% must confirm immediately if it has a MsgSeqNo and not NeedsConfirming + case {NeedsConfirming, MsgSeqNo} of + {_, undefined} -> ok; + {no_confirm, _} -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); + {confirm, _} -> ok end, PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = @@ -479,31 +498,37 @@ attempt_delivery(#delivery{txn = none, BQ:publish_delivered( AckRequired, Message, (?BASE_MESSAGE_PROPERTIES)#message_properties{ - needs_confirming = NeedsConfirming}, + needs_confirming = (NeedsConfirming =:= confirm)}, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, - deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); + {Delivered, State1} = + deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State), + {Delivered, NeedsConfirming, State1}; attempt_delivery(#delivery{txn = Txn, sender = ChPid, message = Message}, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + {NeedsConfirming, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}}) -> record_current_channel_tx(ChPid, Txn), {true, + NeedsConfirming, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}. deliver_or_enqueue(Delivery, State) -> case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of - {true, State1} -> + {true, _, State1} -> {true, State1}; - {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> - #delivery{message = Message, msg_seq_no = MsgSeqNo} = Delivery, + {false, NeedsConfirming, State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}} -> + #delivery{message = Message} = Delivery, BQS1 = BQ:publish(Message, (message_properties(State)) #message_properties{ - needs_confirming = (MsgSeqNo =/= undefined)}, + needs_confirming = + (NeedsConfirming =:= confirm)}, BQS), {false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})} end. @@ -827,7 +852,7 @@ handle_call({deliver_immediately, Delivery}, %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, State1} = + {Delivered, _NeedsConfirming, State1} = attempt_delivery(Delivery, record_confirm_message(Delivery, State)), reply(Delivered, State1); diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 79910b95..233e2b90 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -97,7 +97,7 @@ description() -> {description, <<"Internal user / password database">>}]. check_user_login(Username, []) -> - internal_check_user_login(Username, fun() -> true end); + internal_check_user_login(Username, fun(_) -> true end); check_user_login(Username, [{password, Password}]) -> internal_check_user_login( Username, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 2067e306..579d7f49 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,7 +36,7 @@ -behaviour(gen_server2). -export([start_link/7, do/2, do/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, flushed/2, confirm/2, flush_confirms/1]). +-export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([emit_stats/1]). @@ -49,8 +49,7 @@ uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, confirm_multiple, confirm_tref, - held_confirms, unconfirmed, queues_for_msg}). + confirm_enabled, publish_seqno, unconfirmed, queues_for_msg}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -72,8 +71,6 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). --define(FLUSH_CONFIRMS_INTERVAL, 1000). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -97,8 +94,7 @@ (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). --spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok'). --spec(flush_confirms/1 :: (pid()) -> 'ok'). +-spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). @@ -137,11 +133,8 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). -confirm(Pid, MsgSeqNo) -> - gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}). - -flush_confirms(Pid) -> - gen_server2:cast(Pid, flush_confirms). +confirm(Pid, MsgSeqNos) -> + gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). list() -> pg_local:get_members(rabbit_channels). @@ -193,8 +186,6 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, publish_seqno = 0, - confirm_multiple = false, - held_confirms = gb_sets:new(), unconfirmed = gb_sets:new(), queues_for_msg = dict:new()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), @@ -292,11 +283,8 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, hibernate}; -handle_cast(flush_confirms, State) -> - {noreply, internal_flush_confirms(State)}; - -handle_cast({confirm, MsgSeqNo, From}, State) -> - {noreply, confirm(MsgSeqNo, From, State)}. +handle_cast({confirm, MsgSeqNos, From}, State) -> + {noreply, confirm(MsgSeqNos, From, State)}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, State = #ch{queues_for_msg = QFM}) -> @@ -304,7 +292,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) -> Qs = sets:del_element(QPid, QPids), case sets:size(Qs) of - 0 -> confirm(Msg, QPid, State0); + 0 -> confirm([Msg], QPid, State0); _ -> State0#ch{queues_for_msg = dict:store(Msg, Qs, QFM0)} end @@ -312,16 +300,15 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State1), hibernate}. -handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> +handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), - State1 = internal_flush_confirms(State), rabbit_event:if_enabled(StatsTimer, fun () -> internal_emit_stats( State, [{idle_since, now()}]) end), StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), - {hibernate, State1#ch{stats_timer = StatsTimer1}}. + {hibernate, State#ch{stats_timer = StatsTimer1}}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -484,51 +471,39 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -confirm(undefined, _QPid, State) -> +confirm([], _QPid, State) -> State; -confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> +confirm(_MsgSeqNos, _QPid, State = #ch{confirm_enabled = false}) -> State; -confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> - do_if_unconfirmed(MsgSeqNo, QPid, - fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:send_command( - WriterPid, #'basic.ack'{ - delivery_tag = MSN}), - State1 - end, State); -confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> - do_if_unconfirmed(MsgSeqNo, QPid, - fun(MSN, State1 = #ch{held_confirms = As}) -> - start_confirm_timer( - State1#ch{held_confirms = gb_sets:add(MSN, As)}) - end, State). - -do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, - State = #ch{unconfirmed = UC, - queues_for_msg = QFM}) -> - %% clears references to MsgSeqNo and does ConfirmFun - case gb_sets:is_element(MsgSeqNo, UC) of - true -> - Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC), - case QPid of - undefined -> - ConfirmFun(MsgSeqNo, State#ch{unconfirmed = Unconfirmed1}); - _ -> - {ok, Qs} = dict:find(MsgSeqNo, QFM), - Qs1 = sets:del_element(QPid, Qs), - case sets:size(Qs1) of - 0 -> ConfirmFun(MsgSeqNo, - State#ch{ - queues_for_msg = - dict:erase(MsgSeqNo, QFM), - unconfirmed = Unconfirmed1}); - _ -> State#ch{queues_for_msg = - dict:store(MsgSeqNo, Qs1, QFM)} - end - end; - false -> - State - end. +confirm(MsgSeqNos, undefined, State = #ch{unconfirmed = UC, + queues_for_msg = QFM}) -> + MsgSeqNos1 = [MSN || MSN <- MsgSeqNos, gb_sets:is_element(MSN, UC)], + MS = gb_sets:from_list(MsgSeqNos), + QFM1 = dict:filter(fun(M, _Q) -> not(gb_sets:is_element(M, MS)) end, QFM), + send_confirms(MsgSeqNos1, State#ch{unconfirmed = gb_sets:difference(UC, MS), + queues_for_msg = QFM1}); +confirm(MsgSeqNos, QPid, State) -> + {DoneMessages, State1} = + lists:foldl( + fun(MsgSeqNo, {DMs, State0 = #ch{unconfirmed = UC0, + queues_for_msg = QFM0}}) -> + case gb_sets:is_element(MsgSeqNo, UC0) of + false -> {DMs, State0}; + true -> {ok, Qs} = dict:find(MsgSeqNo, QFM0), + Qs1 = sets:del_element(QPid, Qs), + case sets:size(Qs1) of + 0 -> {[MsgSeqNo | DMs], + State0#ch{ + queues_for_msg = + dict:erase(MsgSeqNo, QFM0), + unconfirmed = + gb_sets:delete(MsgSeqNo, UC0)}}; + _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0), + {DMs, State0#ch{queues_for_msg = QFM1}} + end + end + end, {[], State}, MsgSeqNos), + send_confirms(DoneMessages, State1). handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -1010,20 +985,10 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId}) rabbit_misc:protocol_error( precondition_failed, "cannot switch from tx to confirm mode", []); -handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, - _, State = #ch{confirm_enabled = false}) -> - return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple}, +handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> + return_ok(State#ch{confirm_enabled = true}, NoWait, #'confirm.select_ok'{}); -handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, - _, State = #ch{confirm_enabled = true, - confirm_multiple = Multiple}) -> - return_ok(State, NoWait, #'confirm.select_ok'{}); - -handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) -> - rabbit_misc:protocol_error( - precondition_failed, "cannot change confirm_multiple setting", []); - handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of @@ -1253,12 +1218,12 @@ is_message_persistent(Content) -> process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_route), - confirm(MsgSeqNo, undefined, State); + confirm([MsgSeqNo], undefined, State); process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_consumers), - confirm(MsgSeqNo, undefined, State); + confirm([MsgSeqNo], undefined, State); process_routing_result(routed, [], MsgSeqNo, _, State) -> - confirm(MsgSeqNo, undefined, State); + confirm([MsgSeqNo], undefined, State); process_routing_result(routed, _, undefined, _, State) -> State; process_routing_result(routed, QPids, MsgSeqNo, _, @@ -1272,47 +1237,28 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. -start_confirm_timer(State = #ch{confirm_tref = undefined}) -> - {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL, - ?MODULE, flush_confirms, [self()]), - State#ch{confirm_tref = TRef}; -start_confirm_timer(State) -> - State. - -stop_confirm_timer(State = #ch{confirm_tref = undefined}) -> +send_confirms([], State) -> State; -stop_confirm_timer(State = #ch{confirm_tref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), - State#ch{confirm_tref = undefined}. - -internal_flush_confirms(State = #ch{writer_pid = WriterPid, - held_confirms = Cs}) -> - case gb_sets:is_empty(Cs) of - true -> State#ch{confirm_tref = undefined}; - false -> [First | Rest] = gb_sets:to_list(Cs), - {Mult, Inds} = find_consecutive_sequence(First, Rest), - ok = rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = Mult, multiple = true}), - ok = lists:foldl( - fun(T, ok) -> rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = T}) - end, ok, Inds), - State#ch{held_confirms = gb_sets:new(), - confirm_tref = undefined} - end. - -%% Find longest sequence of consecutive numbers at the beginning. -find_consecutive_sequence(Last, []) -> - {Last, []}; -find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) -> - find_consecutive_sequence(N, Ns); -find_consecutive_sequence(Last, Ns) -> - {Last, Ns}. +send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> + SCs = lists:usort(Cs), + CutOff = case gb_sets:is_empty(UC) of + true -> lists:last(SCs) + 1; + false -> gb_sets:smallest(UC) + end, + {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs), + case Ms of + [] -> ok; + _ -> ok = rabbit_writer:send_command( + WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms), + multiple = true}) + end, + ok = lists:foldl(fun(T, ok) -> + rabbit_writer:send_command( + WriterPid, #'basic.ack'{delivery_tag = T}) + end, ok, Ss), + State. -terminate(State) -> - stop_confirm_timer(State), +terminate(_State) -> pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]). diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 89954b06..c6a083bb 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -32,7 +32,7 @@ -module(rabbit_net). -include("rabbit.hrl"). --export([is_ssl/1, controlling_process/2, getstat/2, +-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, async_recv/3, port_command/2, send/2, close/1, sockname/1, peername/1, peercert/1]). @@ -50,6 +50,9 @@ -type(socket() :: port() | #ssl_socket{}). -spec(is_ssl/1 :: (socket()) -> boolean()). +-spec(ssl_info/1 :: (socket()) + -> 'nossl' | ok_val_or_error( + {atom(), {atom(), atom(), atom()}})). -spec(controlling_process/2 :: (socket(), pid()) -> ok_or_any_error()). -spec(getstat/2 :: (socket(), [stat_option()]) @@ -77,6 +80,11 @@ is_ssl(Sock) -> ?IS_SSL(Sock). +ssl_info(Sock) when ?IS_SSL(Sock) -> + ssl:connection_info(Sock#ssl_socket.ssl); +ssl_info(_Sock) -> + nossl. + controlling_process(Sock, Pid) when ?IS_SSL(Sock) -> ssl:controlling_process(Sock#ssl_socket.ssl, Pid); controlling_process(Sock, Pid) when is_port(Sock) -> diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 76c0a4ef..2162104f 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -33,7 +33,7 @@ -export([init/2, shutdown_terms/1, recover/5, terminate/2, delete_and_terminate/1, - publish/5, deliver/2, ack/2, sync/2, flush/1, read/3, + publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). -export([add_queue_ttl/0]). @@ -297,11 +297,12 @@ deliver(SeqIds, State) -> ack(SeqIds, State) -> deliver_or_ack(ack, SeqIds, State). -sync([], State) -> - State; -sync(_SeqIds, State = #qistate { journal_handle = undefined }) -> - State; -sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> +%% This is only called when there are outstanding confirms and the +%% queue is idle. +sync(State = #qistate { unsynced_guids = Guids }) -> + sync_if([] =/= Guids, State). + +sync(SeqIds, State) -> %% The SeqIds here contains the SeqId of every publish and ack in %% the transaction. Ideally we should go through these seqids and %% only sync the journal if the pubs or acks appear in the @@ -309,9 +310,8 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> %% the variable queue publishes and acks to the qi, and then %% syncs, all in one operation, there is no possibility of the %% seqids not being in the journal, provided the transaction isn't - %% emptied (handled above anyway). - ok = file_handle_cache:sync(JournalHdl), - notify_sync(State). + %% emptied (handled by sync_if anyway). + sync_if([] =/= SeqIds, State). flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). @@ -723,6 +723,14 @@ deliver_or_ack(Kind, SeqIds, State) -> add_to_journal(SeqId, Kind, StateN) end, State1, SeqIds)). +sync_if(false, State) -> + State; +sync_if(_Bool, State = #qistate { journal_handle = undefined }) -> + State; +sync_if(true, State = #qistate { journal_handle = JournalHdl }) -> + ok = file_handle_cache:sync(JournalHdl), + notify_sync(State). + notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) -> OnSyncFun(gb_sets:from_list(UG)), State #qistate { unsynced_guids = [] }. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e87ff879..a3292d35 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -65,6 +65,8 @@ -define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl, peer_cert_subject, peer_cert_issuer, peer_cert_validity, auth_mechanism, + ssl_protocol, ssl_key_exchange, + ssl_cipher, ssl_hash, protocol, user, vhost, timeout, frame_max, client_properties]). @@ -905,6 +907,14 @@ i(peer_port, #v1{sock = Sock}) -> socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock); i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock); +i(ssl_protocol, #v1{sock = Sock}) -> + ssl_info(fun ({P, _}) -> P end, Sock); +i(ssl_key_exchange, #v1{sock = Sock}) -> + ssl_info(fun ({_, {K, _, _}}) -> K end, Sock); +i(ssl_cipher, #v1{sock = Sock}) -> + ssl_info(fun ({_, {_, C, _}}) -> C end, Sock); +i(ssl_hash, #v1{sock = Sock}) -> + ssl_info(fun ({_, {_, _, H}}) -> H end, Sock); i(peer_cert_issuer, #v1{sock = Sock}) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock); i(peer_cert_subject, #v1{sock = Sock}) -> @@ -955,6 +965,13 @@ socket_info(Get, Select) -> {error, _} -> '' end. +ssl_info(F, Sock) -> + case rabbit_net:ssl_info(Sock) of + nossl -> ''; + {error, _} -> ''; + {ok, Info} -> F(Info) + end. + cert_info(F, Sock) -> case rabbit_net:peercert(Sock) of nossl -> ''; diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 19d7c576..18423dd7 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -535,20 +535,20 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, in_counter = InCount, persistent_count = PCount, durable = IsDurable, - unconfirmed = Unconfirmed }) -> + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), - Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed), + UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC), {SeqId, a(reduce_memory_use( State2 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1, persistent_count = PCount1, - unconfirmed = Unconfirmed1 }))}. + unconfirmed = UC1 }))}. dropwhile(Pred, State) -> {_OkOrEmpty, State1} = dropwhile1(Pred, State), @@ -809,17 +809,22 @@ ram_duration(State = #vqstate { ram_msg_count_prev = RamMsgCount, ram_ack_count_prev = RamAckCount }}. -needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) -> - {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end, - fun (_Quota, State1) -> State1 end, - fun (State1) -> State1 end, - fun (_Quota, State1) -> {0, State1} end, - State), - Res; -needs_idle_timeout(_State) -> - true. +needs_idle_timeout(State = #vqstate { on_sync = OnSync, unconfirmed = UC }) -> + case {OnSync, gb_sets:is_empty(UC)} of + {?BLANK_SYNC, true} -> + {Res, _State} = reduce_memory_use( + fun (_Quota, State1) -> {0, State1} end, + fun (_Quota, State1) -> State1 end, + fun (State1) -> State1 end, + fun (_Quota, State1) -> {0, State1} end, + State), + Res; + _ -> + true + end. -idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))). +idle_timeout(State) -> + a(reduce_memory_use(confirm_commit_index(tx_commit_index(State)))). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. @@ -1232,7 +1237,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, persistent_count = PCount, durable = IsDurable, ram_msg_count = RamMsgCount, - unconfirmed = Unconfirmed }) -> + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, @@ -1242,13 +1247,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), - Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed), + UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC), {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, len = Len + 1, in_counter = InCount + 1, persistent_count = PCount1, ram_msg_count = RamMsgCount + 1, - unconfirmed = Unconfirmed1 }}. + unconfirmed = UC1 }}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, _MSCState) -> @@ -1386,6 +1391,11 @@ find_persistent_count(LensByStore) -> %% Internal plumbing for confirms (aka publisher acks) %%---------------------------------------------------------------------------- +confirm_commit_index(State = #vqstate { unconfirmed = [] }) -> + State; +confirm_commit_index(State = #vqstate { index_state = IndexState }) -> + State #vqstate { index_state = rabbit_queue_index:sync(IndexState) }. + remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, unconfirmed = UC }) -> |