diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-25 22:29:37 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-25 22:29:37 +0100 |
commit | b3efbd8b78d4a115a6443def25f5fa29304999e6 (patch) | |
tree | 5bd18b9e086a3407b28e032b375046f7e1a61028 | |
parent | 7a58f836896da8ba06ae5376f08e4f45f81923bd (diff) | |
parent | a3609bcb7c7ada5b3706a83e097a4950074bb6d3 (diff) | |
download | rabbitmq-server-b3efbd8b78d4a115a6443def25f5fa29304999e6.tar.gz |
merge from default again
-rw-r--r-- | include/rabbit.hrl | 7 | ||||
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 21 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 155 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 14 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 237 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 170 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 43 | ||||
-rw-r--r-- | src/rabbit_router.erl | 20 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 33 | ||||
-rw-r--r-- | src/rabbit_types.erl | 3 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 134 |
12 files changed, 625 insertions, 215 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index ae672fc9..df22b573 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -69,12 +69,13 @@ is_persistent}). -record(ssl_socket, {tcp, ssl}). --record(delivery, {mandatory, immediate, txn, sender, message}). --record(amqp_error, {name, explanation = "", method = none}). +-record(delivery, {mandatory, immediate, txn, sender, message, + origin, msg_seq_no}). +-record(amqp_error, {name, explanation, method = none}). -record(event, {type, props, timestamp}). --record(message_properties, {expiry}). +-record(message_properties, {expiry, needs_confirming = false}). %%---------------------------------------------------------------------------- diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 20230b24..f67c6f46 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -37,6 +37,7 @@ -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). -type(ack_required() :: boolean()). +-type(confirm_required() :: boolean()). -type(message_properties_transformer() :: fun ((rabbit_types:message_properties()) -> rabbit_types:message_properties())). @@ -57,7 +58,7 @@ (fun ((rabbit_types:message_properties()) -> boolean()), state()) -> state()). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). --spec(ack/2 :: ([ack()], state()) -> state()). +-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). -spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(), rabbit_types:message_properties(), state()) -> state()). -spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9d78bafa..f7583482 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -34,6 +34,7 @@ -export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, + maybe_run_queue_via_backing_queue_async/2, update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2, maybe_expire/1, drop_expired/1]). -export([pseudo_queue/2]). @@ -157,6 +158,8 @@ rabbit_types:connection_exit()). -spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> A))) -> 'ok'). +-spec(maybe_run_queue_via_backing_queue_async/2 :: + (pid(), (fun ((A) -> A))) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -380,16 +383,13 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity). -deliver(QPid, #delivery{immediate = true, - txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:call(QPid, {deliver_immediately, Txn, Message, ChPid}, - infinity); -deliver(QPid, #delivery{mandatory = true, - txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:call(QPid, {deliver, Txn, Message, ChPid}, infinity), +deliver(QPid, Delivery = #delivery{immediate = true}) -> + gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity); +deliver(QPid, Delivery = #delivery{mandatory = true}) -> + gen_server2:call(QPid, {deliver, Delivery}, infinity), true; -deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}), +deliver(QPid, Delivery) -> + gen_server2:cast(QPid, {deliver, Delivery}), true. requeue(QPid, MsgIds, ChPid) -> @@ -466,6 +466,9 @@ internal_delete(QueueName) -> maybe_run_queue_via_backing_queue(QPid, Fun) -> gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity). +maybe_run_queue_via_backing_queue_async(QPid, Fun) -> + gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}). + update_ram_duration(QPid) -> gen_server2:cast(QPid, update_ram_duration). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6048920e..f6255d2e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,7 +39,8 @@ -define(SYNC_INTERVAL, 5). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). --define(BASE_MESSAGE_PROPERTIES, #message_properties{expiry = undefined}). +-define(BASE_MESSAGE_PROPERTIES, + #message_properties{expiry = undefined, needs_confirming = false}). -export([start_link/1, info_keys/0]). @@ -64,6 +65,7 @@ rate_timer_ref, expiry_timer_ref, stats_timer, + guid_to_channel, ttl, ttl_timer_ref }). @@ -128,7 +130,8 @@ init(Q) -> rate_timer_ref = undefined, expiry_timer_ref = undefined, ttl = undefined, - stats_timer = rabbit_event:init_stats_timer()}, hibernate, + stats_timer = rabbit_event:init_stats_timer(), + guid_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -354,11 +357,13 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), - ChAckTags1 = case AckRequired of - true -> sets:add_element( - AckTag, ChAckTags); - false -> ChAckTags - end, + {State2, ChAckTags1} = + case AckRequired of + true -> {State1, + sets:add_element(AckTag, ChAckTags)}; + false -> {confirm_message(Message, State1), + ChAckTags} + end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, store_ch_record(NewC), @@ -374,10 +379,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, - State2 = State1#q{ + State3 = State2#q{ active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers}, - deliver_msgs_to_consumers(Funs, FunAcc1, State2); + deliver_msgs_to_consumers(Funs, FunAcc1, State3); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> store_ch_record(C#cr{is_limit_active = true}), @@ -405,6 +410,33 @@ deliver_from_queue_deliver(AckRequired, false, State) -> fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. +confirm_messages(Guids, State) -> + lists:foldl(fun confirm_message_by_guid/2, State, Guids). + +confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) -> + case dict:find(Guid, GTC) of + {ok, {_ , undefined}} -> ok; + {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo); + _ -> ok + end, + State#q{guid_to_channel = dict:erase(Guid, GTC)}. + +confirm_message(#basic_message{guid = Guid}, State) -> + confirm_message_by_guid(Guid, State). + +record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> + State; +record_confirm_message(#delivery{msg_seq_no = MsgSeqNo, + sender = ChPid, + message = #basic_message{guid = Guid}}, + State = #q{guid_to_channel = GTC}) -> + State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}. + +ack_by_acktags(AckTags, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {AckdGuids, BQS1} = BQ:ack(AckTags, BQS), + confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}). + run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3}, @@ -414,7 +446,10 @@ run_message_queue(State) -> {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), State2. -attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> +attempt_delivery(#delivery{txn = none, + message = Message, + msg_seq_no = MsgSeqNo}, + State = #q{backing_queue = BQ}) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> @@ -423,28 +458,36 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> %% message_properties. {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Message, - ?BASE_MESSAGE_PROPERTIES, BQS), + ?BASE_MESSAGE_PROPERTIES + #message_properties { + needs_confirming = + (MsgSeqNo =/= undefined)}, + BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +attempt_delivery(#delivery{txn = Txn, + sender = ChPid, + message = Message}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}. -deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> - case attempt_delivery(Txn, ChPid, Message, State) of - {true, NewState} -> - {true, NewState}; - {false, NewState} -> - %% Txn is none and no unblocked channels with consumers - BQS = BQ:publish(Message, - message_properties(State), - State #q.backing_queue_state), - {false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})} +deliver_or_enqueue(Delivery, State) -> + case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of + {true, State1} -> + {true, State1}; + {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> + #delivery{message = Message, msg_seq_no = MsgSeqNo} = Delivery, + BQS1 = BQ:publish(Message, + (message_properties(State)) #message_properties { + needs_confirming = (MsgSeqNo =/= undefined) }, + BQS), + {false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})} end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> @@ -549,7 +592,12 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - run_message_queue(State#q{backing_queue_state = Fun(BQS)}). + {BQS2, State1} = + case Fun(BQS) of + {BQS1, {confirm, Guids}} -> {BQS1, confirm_messages(Guids, State)}; + BQS1 -> {BQS1, State} + end, + run_message_queue(State1#q{backing_queue_state = BQS2}). commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS, @@ -725,7 +773,8 @@ handle_call(consumers, _From, [{ChPid, ConsumerTag, AckRequired} | Acc] end, [], queue:join(ActiveConsumers, BlockedConsumers)), State); -handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> +handle_call({deliver_immediately, Delivery = #delivery{message = Message}}, + _From, State) -> %% Synchronous, "immediate" delivery mode %% %% FIXME: Is this correct semantics? @@ -739,12 +788,16 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), - reply(Delivered, NewState); - -handle_call({deliver, Txn, Message, ChPid}, _From, State) -> + {Delivered, State1} = + attempt_delivery(Delivery, record_confirm_message(Delivery, State)), + reply(Delivered, case Delivered of + true -> State1; + false -> confirm_message(Message, State1) + end); + +handle_call({deliver, Delivery}, _From, State) -> %% Synchronous, "mandatory" delivery mode - {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), + {Delivered, NewState} = deliver_or_enqueue(Delivery, State), reply(Delivered, NewState); handle_call({commit, Txn, ChPid}, From, State) -> @@ -770,14 +823,16 @@ handle_call({basic_get, ChPid, NoAck}, _From, {empty, State2} -> reply(empty, State2); {{Message, IsDelivered, AckTag, Remaining}, State2} -> - case AckRequired of - true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - store_ch_record( - C#cr{acktags = sets:add_element(AckTag, ChAckTags)}); - false -> ok - end, + State3 = + case AckRequired of + true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), + ChAckTags1 = sets:add_element(AckTag, ChAckTags), + store_ch_record(C#cr{acktags = ChAckTags1}), + State2; + false -> confirm_message(Message, State2) + end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, State2) + reply({ok, Remaining, Msg}, State3) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, @@ -887,9 +942,13 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). -handle_cast({deliver, Txn, Message, ChPid}, State) -> + +handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> + noreply(maybe_run_queue_via_backing_queue(Fun, State)); + +handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), + {_Delivered, NewState} = deliver_or_enqueue(Delivery, State), noreply(NewState); handle_cast({ack, Txn, AckTags, ChPid}, @@ -898,18 +957,21 @@ handle_cast({ack, Txn, AckTags, ChPid}, not_found -> noreply(State); C = #cr{acktags = ChAckTags} -> - {C1, BQS1} = + {C1, State1} = case Txn of none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), - {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)}; - _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)} + NewC = C#cr{acktags = ChAckTags1}, + NewState = ack_by_acktags(AckTags, State), + {NewC, NewState}; + _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS), + {C#cr{txn = Txn}, + State#q{backing_queue_state = BQS1}} end, store_ch_record(C1), - noreply(State#q{backing_queue_state = BQS1}) + noreply(State1) end; -handle_cast({reject, AckTags, Requeue, ChPid}, - State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> +handle_cast({reject, AckTags, Requeue, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> noreply(State); @@ -918,8 +980,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> BQS1 = BQ:ack(AckTags, BQS), - State #q { backing_queue_state = BQS1 } + false -> ack_by_acktags(AckTags, State) end) end; diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 38412982..1ac39b65 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/4, properties/1, delivery/4]). +-export([publish/1, message/4, properties/1, delivery/5]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). -export([is_message_persistent/1]). @@ -50,9 +50,10 @@ -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). --spec(delivery/4 :: +-spec(delivery/5 :: (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), - rabbit_types:message()) -> rabbit_types:delivery()). + rabbit_types:message(), undefined | integer()) -> + rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(), binary()) -> @@ -88,9 +89,9 @@ publish(Delivery = #delivery{ Other end. -delivery(Mandatory, Immediate, Txn, Message) -> +delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, - sender = self(), message = Message}. + sender = self(), message = Message, msg_seq_no = MsgSeqNo}. build_content(Properties, BodyBin) -> %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 @@ -157,7 +158,8 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties, BodyBin) -> publish(delivery(Mandatory, Immediate, Txn, message(ExchangeName, RoutingKeyBin, - properties(Properties), BodyBin))). + properties(Properties), BodyBin), + undefined)). is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 58c8e341..7c45b52d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -38,7 +38,7 @@ -export([start_link/7, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([emit_stats/1, flush/1]). +-export([emit_stats/1, flush/1, flush_multiple_acks/1, confirm/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -48,7 +48,9 @@ start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, queue_collector_pid, stats_timer}). + consumer_mapping, blocking, queue_collector_pid, stats_timer, + confirm_enabled, published_count, confirm_multiple, confirm_tref, + held_confirms, unconfirmed, queues_for_msg}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -70,6 +72,8 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). +-define(FLUSH_MULTIPLE_ACKS_INTERVAL, 1000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -99,6 +103,8 @@ -spec(info_all/0 :: () -> [[rabbit_types:info()]]). -spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]). -spec(emit_stats/1 :: (pid()) -> 'ok'). +-spec(flush_multiple_acks/1 :: (pid()) -> 'ok'). +-spec(confirm/2 ::(pid(), integer()) -> 'ok'). -endif. @@ -153,6 +159,12 @@ emit_stats(Pid) -> flush(Pid) -> gen_server2:call(Pid, flush). +flush_multiple_acks(Pid) -> + gen_server2:cast(Pid, flush_multiple_acks). + +confirm(Pid, MsgSeqNo) -> + gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}). + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, @@ -177,7 +189,13 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, consumer_mapping = dict:new(), blocking = dict:new(), queue_collector_pid = CollectorPid, - stats_timer = StatsTimer}, + stats_timer = StatsTimer, + confirm_enabled = false, + published_count = 0, + confirm_multiple = false, + held_confirms = gb_sets:new(), + unconfirmed = gb_sets:new(), + queues_for_msg = dict:new()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -258,19 +276,46 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> internal_emit_stats(State), {noreply, - State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}}. - -handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> + State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}}; + +handle_cast(flush_multiple_acks, + State = #ch{writer_pid = WriterPid, + held_confirms = As, + unconfirmed = UC}) -> + flush_multiple(WriterPid, As, UC), + {noreply, State#ch{held_confirms = gb_sets:new(), + confirm_tref = undefined}}; + +handle_cast({confirm, MsgSeqNo, From}, State) -> + {noreply, send_or_enqueue_ack(MsgSeqNo, From, State)}. + +handle_info({'DOWN', _MRef, process, QPid, _Reason}, + State = #ch{queues_for_msg = QFM}) -> + State1 = dict:fold( + fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) -> + Qs = sets:del_element(QPid, QPids), + case sets:size(Qs) of + 0 -> send_or_enqueue_ack(Msg, QPid, State0); + _ -> State0#ch{queues_for_msg = + dict:store(Msg, Qs, QFM0)} + end + end, State, QFM), erase_queue_stats(QPid), - {noreply, queue_blocked(QPid, State)}. + {noreply, queue_blocked(QPid, State1)}. -handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> +handle_pre_hibernate(State = #ch{writer_pid = WriterPid, + held_confirms = As, + stats_timer = StatsTimer, + unconfirmed = UC}) -> ok = clear_permission_cache(), - rabbit_event:if_enabled(StatsTimer, fun () -> + flush_multiple(WriterPid, As, UC), + rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), - {hibernate, - State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}. + StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), + {hibernate, State#ch{held_confirms = gb_sets:new(), + stats_timer = StatsTimer1, + confirm_tref = undefined}}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -415,6 +460,53 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. +send_or_enqueue_ack(undefined, _QPid, State) -> + State; +send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> + State; +send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> + do_if_unconfirmed( + MsgSeqNo, QPid, + fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.ack'{delivery_tag = MSN}), + State1 + end, State); +send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> + do_if_unconfirmed( + MsgSeqNo, QPid, + fun(MSN, State1 = #ch{held_confirms = As}) -> + start_ack_timer(State1#ch{held_confirms = gb_sets:add(MSN, As)}) + end, State). + +do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, + State = #ch{unconfirmed = UC, + queues_for_msg = QFM}) -> + %% clears references to MsgSeqNo and does ConfirmFun + case gb_sets:is_element(MsgSeqNo, UC) of + true -> + case QPid of + undefined -> + ConfirmFun(MsgSeqNo, + State#ch{unconfirmed = + gb_sets:delete(MsgSeqNo, UC)}); + _ -> + {ok, Qs} = dict:find(MsgSeqNo, QFM), + Qs1 = sets:del_element(QPid, Qs), + case sets:size(Qs1) of + 0 -> ConfirmFun(MsgSeqNo, + State#ch{ + queues_for_msg = + dict:erase(MsgSeqNo, QFM), + unconfirmed = + gb_sets:delete(MsgSeqNo, UC)}); + _ -> State#ch{queues_for_msg = + dict:store(MsgSeqNo, Qs1, QFM)} + end + end; + false -> State + end. + handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -437,9 +529,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, immediate = Immediate}, - Content, State = #ch{virtual_host = VHostPath, - transaction_id = TxnKey, - writer_pid = WriterPid}) -> + Content, State = #ch{virtual_host = VHostPath, + transaction_id = TxnKey, + confirm_enabled = ConfirmEnabled}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -447,6 +539,15 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), IsPersistent = is_message_persistent(DecodedContent), + {MsgSeqNo, State1} + = case ConfirmEnabled of + false -> {undefined, State}; + true -> Count = State#ch.published_count, + {Count, + State#ch{published_count = Count + 1, + unconfirmed = + gb_sets:add(Count, State#ch.unconfirmed)}} + end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, @@ -455,18 +556,19 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), - case RoutingRes of - routed -> ok; - unroutable -> ok = basic_return(Message, WriterPid, no_route); - not_delivered -> ok = basic_return(Message, WriterPid, no_consumers) - end, + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, + case IsPersistent of + true -> MsgSeqNo; + false -> undefined + end)), + State2 = process_routing_result(RoutingRes, DeliveredQPids, IsPersistent, + MsgSeqNo, Message, State1), maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || - QPid <- DeliveredQPids]], publish, State), + QPid <- DeliveredQPids]], publish, State2), {noreply, case TxnKey of - none -> State; - _ -> add_tx_participants(DeliveredQPids, State) + none -> State2; + _ -> add_tx_participants(DeliveredQPids, State2) end}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, @@ -875,6 +977,11 @@ handle_method(#'queue.purge'{queue = QueueNameBin, return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); + +handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) -> + rabbit_misc:protocol_error( + precondition_failed, "cannot switch from confirm to tx mode", []); + handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none}) -> {reply, #'tx.select_ok'{}, new_tx(State)}; @@ -895,6 +1002,25 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; +handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId}) + when TxId =/= none -> + rabbit_misc:protocol_error( + precondition_failed, "cannot switch from tx to confirm mode", []); + +handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, + _, State = #ch{confirm_enabled = false}) -> + return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple}, + NoWait, #'confirm.select_ok'{}); + +handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, + _, State = #ch{confirm_enabled = true, + confirm_multiple = Multiple}) -> + return_ok(State, NoWait, #'confirm.select_ok'{}); + +handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) -> + rabbit_misc:protocol_error( + precondition_failed, "cannot change confirm_multiple setting", []); + handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of @@ -1119,6 +1245,24 @@ is_message_persistent(Content) -> IsPersistent end. +process_routing_result(unroutable, _, _, MsgSeqNo, Message, State) -> + ok = basic_return(Message, State#ch.writer_pid, no_route), + send_or_enqueue_ack(MsgSeqNo, undefined, State); +process_routing_result(not_delivered, _, _, MsgSeqNo, Message, State) -> + ok = basic_return(Message, State#ch.writer_pid, no_consumers), + send_or_enqueue_ack(MsgSeqNo, undefined, State); +process_routing_result(routed, [], _, MsgSeqNo, _, State) -> + send_or_enqueue_ack(MsgSeqNo, undefined, State); +process_routing_result(routed, _, _, undefined, _, State) -> + State; +process_routing_result(routed, _, false, MsgSeqNo, _, State) -> + send_or_enqueue_ack(MsgSeqNo, undefined, State); +process_routing_result(routed, QPids, true, MsgSeqNo, _, + State = #ch{queues_for_msg = QFM}) -> + QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), + [maybe_monitor(QPid) || QPid <- QPids], + State#ch{queues_for_msg = QFM1}. + lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; lock_message(false, _MsgStruct, State) -> @@ -1140,7 +1284,8 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, false -> rabbit_writer:send_command(WriterPid, M, Content) end. -terminate(_State) -> +terminate(State) -> + stop_ack_timer(State), pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]). @@ -1222,3 +1367,47 @@ erase_queue_stats(QPid) -> erase({queue_stats, QPid}), [erase({queue_exchange_stats, QX}) || {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. + +start_ack_timer(State = #ch{confirm_tref = undefined}) -> + {ok, TRef} = timer:apply_after(?FLUSH_MULTIPLE_ACKS_INTERVAL, + ?MODULE, flush_multiple_acks, [self()]), + State#ch{confirm_tref = TRef}; +start_ack_timer(State) -> + State. + +stop_ack_timer(State = #ch{confirm_tref = undefined}) -> + State; +stop_ack_timer(State = #ch{confirm_tref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + State#ch{confirm_tref = undefined}. + +flush_multiple(WriterPid, As, NA) -> + case gb_sets:is_empty(As) of + true -> ok; + false -> [First | Rest] = gb_sets:to_list(As), + [rabbit_writer:send_command(WriterPid, + #'basic.ack'{delivery_tag = A}) || + A <- case Rest of + [] -> [First]; + _ -> flush_multiple( + First, Rest, WriterPid, + case gb_sets:is_empty(NA) of + false -> gb_sets:smallest(NA); + true -> gb_sets:largest(As) + 1 + end) + end], + ok + end. + +flush_multiple(Prev, [Cur | Rest], WriterPid, SNA) -> + ExpNext = Prev + 1, + case {SNA >= Cur, Cur} of + {true, ExpNext} -> flush_multiple(Cur, Rest, WriterPid, SNA); + _ -> flush_multiple(Prev, [], WriterPid, SNA), + [Cur | Rest] + end; +flush_multiple(Prev, [], WriterPid, _) -> + ok = rabbit_writer:send_command(WriterPid, + #'basic.ack'{delivery_tag = Prev, + multiple = true}), + []. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index b9f3e1a3..ba2af20c 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,7 +34,7 @@ -behaviour(gen_server2). -export([start_link/4, successfully_recovered_state/1, - client_init/2, client_terminate/2, client_delete_and_terminate/3, + client_init/3, client_terminate/2, client_delete_and_terminate/3, write/4, read/3, contains/2, remove/2, release/2, sync/3]). -export([sync/1, set_maximum_since_use/2, @@ -82,7 +82,9 @@ cur_file_cache_ets, %% tid of current file cache table client_refs, %% set of references of all registered clients successfully_recovered, %% boolean: did we recover state? - file_size_limit %% how big are our files allowed to get? + file_size_limit, %% how big are our files allowed to get? + client_ondisk_callback, %% client ref to callback function mapping + cref_to_guids %% client ref to synced messages mapping }). -record(client_msstate, @@ -94,7 +96,8 @@ file_handles_ets, file_summary_ets, dedup_cache_ets, - cur_file_cache_ets + cur_file_cache_ets, + client_ref }). -record(file_summary, @@ -132,16 +135,18 @@ file_handles_ets :: ets:tid(), file_summary_ets :: ets:tid(), dedup_cache_ets :: ets:tid(), - cur_file_cache_ets :: ets:tid() }). + cur_file_cache_ets :: ets:tid(), + client_ref :: rabbit_guid:guid()}). -type(startup_fun_state() :: {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), A}). +-type(guid_fun() :: fun (([rabbit_guid:guid()]) -> any())). -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', startup_fun_state()) -> rabbit_types:ok_pid_or_error()). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). --spec(client_init/2 :: (server(), binary()) -> client_msstate()). +-spec(client_init/3 :: (server(), rabbit_guid:guid(), guid_fun()) -> client_msstate()). -spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). -spec(client_delete_and_terminate/3 :: (client_msstate(), server(), binary()) -> 'ok'). @@ -328,10 +333,11 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> successfully_recovered_state(Server) -> gen_server2:call(Server, successfully_recovered_state, infinity). -client_init(Server, Ref) -> +client_init(Server, Ref, MsgOnDiskFun) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = - gen_server2:call(Server, {new_client_state, Ref}, infinity), + gen_server2:call(Server, {new_client_state, Ref, MsgOnDiskFun}, + infinity), #client_msstate { file_handle_cache = dict:new(), index_state = IState, index_module = IModule, @@ -340,20 +346,22 @@ client_init(Server, Ref) -> file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts }. + cur_file_cache_ets = CurFileCacheEts, + client_ref = Ref }. client_terminate(CState, Server) -> close_all_handles(CState), - ok = gen_server2:call(Server, client_terminate, infinity). + ok = gen_server2:call(Server, {client_terminate, CState}, infinity). client_delete_and_terminate(CState, Server, Ref) -> close_all_handles(CState), ok = gen_server2:cast(Server, {client_delete, Ref}). write(Server, Guid, Msg, - CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> + CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, + client_ref = CRef }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - {gen_server2:cast(Server, {write, Guid}), CState}. + {gen_server2:cast(Server, {write, CRef, Guid}), CState}. read(Server, Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, @@ -506,6 +514,13 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, end end. +clear_client_callback(CRef, + State = #msstate { client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> + State #msstate { client_ondisk_callback = dict:erase(CRef, CODC), + cref_to_guids = dict:erase(CRef, CTG)}. + + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -573,7 +588,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> cur_file_cache_ets = CurFileCacheEts, client_refs = ClientRefs1, successfully_recovered = CleanShutdown, - file_size_limit = FileSizeLimit + file_size_limit = FileSizeLimit, + client_ondisk_callback = dict:new(), + cref_to_guids = dict:new() }, %% If we didn't recover the msg location index then we need to @@ -602,10 +619,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> prioritise_call(Msg, _From, _State) -> case Msg of - successfully_recovered_state -> 7; - {new_client_state, _Ref} -> 7; - {read, _Guid} -> 2; - _ -> 0 + successfully_recovered_state -> 7; + {new_client_state, _Ref, _MODC} -> 7; + {read, _Guid} -> 2; + _ -> 0 end. prioritise_cast(Msg, _State) -> @@ -620,22 +637,29 @@ prioritise_cast(Msg, _State) -> handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); -handle_call({new_client_state, CRef}, _From, - State = #msstate { dir = Dir, - index_state = IndexState, - index_module = IndexModule, - file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts, - client_refs = ClientRefs, - gc_pid = GCPid }) -> +handle_call({new_client_state, CRef, Callback}, _From, + State = #msstate { dir = Dir, + index_state = IndexState, + index_module = IndexModule, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts, + client_refs = ClientRefs, + client_ondisk_callback = CODC, + gc_pid = GCPid }) -> + CODC1 = case Callback of + undefined -> CODC; + _ -> dict:store(CRef, Callback, CODC) + end, reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts}, - State #msstate { client_refs = sets:add_element(CRef, ClientRefs) }); + State #msstate { client_refs = sets:add_element(CRef, ClientRefs), + client_ondisk_callback = CODC1 }); -handle_call(client_terminate, _From, State) -> - reply(ok, State); +handle_call({client_terminate, #client_msstate { client_ref = CRef }}, _From, + State) -> + reply(ok, clear_client_callback(CRef, State)); handle_call({read, Guid}, From, State) -> State1 = read_message(Guid, From, State), @@ -647,36 +671,51 @@ handle_call({contains, Guid}, From, State) -> handle_cast({client_delete, CRef}, State = #msstate { client_refs = ClientRefs }) -> - noreply( - State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }); + State1 = clear_client_callback(CRef, State), + noreply(State1 #msstate { + client_refs = sets:del_element(CRef, ClientRefs) }); + +handle_cast({write, CRef, Guid}, + State = #msstate { sum_valid_data = SumValid, + file_summary_ets = FileSummaryEts, + current_file = CurFile, + cur_file_cache_ets = CurFileCacheEts, + client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> -handle_cast({write, Guid}, - State = #msstate { sum_valid_data = SumValid, - file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), - case index_lookup(Guid, State) of + CTG1 = case dict:find(CRef, CODC) of + {ok, _} -> rabbit_misc:dict_cons(CRef, Guid, CTG); + error -> CTG + end, + State1 = State #msstate { cref_to_guids = CTG1 }, + case index_lookup(Guid, State1) of not_found -> - write_message(Guid, Msg, State); + write_message(Guid, Msg, State1); #msg_location { ref_count = 0, file = File, total_size = TotalSize } -> case ets:lookup(FileSummaryEts, File) of [#file_summary { locked = true }] -> - ok = index_delete(Guid, State), - write_message(Guid, Msg, State); + ok = index_delete(Guid, State1), + write_message(Guid, Msg, State1); [#file_summary {}] -> - ok = index_update_ref_count(Guid, 1, State), + ok = index_update_ref_count(Guid, 1, State1), [_] = ets:update_counter( FileSummaryEts, File, [{#file_summary.valid_total_size, TotalSize}]), - noreply(State #msstate { + noreply(State1 #msstate { sum_valid_data = SumValid + TotalSize }) end; - #msg_location { ref_count = RefCount } -> + #msg_location { ref_count = RefCount, file = File } -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC - ok = index_update_ref_count(Guid, RefCount + 1, State), - noreply(State) + ok = index_update_ref_count(Guid, RefCount + 1, State1), + CTG2 = case {dict:find(CRef, CODC), File} of + {{ok, _}, CurFile} -> CTG1; + {{ok, Fun}, _} -> Fun([Guid]), CTG; + _ -> CTG1 + end, + noreply(State #msstate { cref_to_guids = CTG2 }) end; handle_cast({remove, Guids}, State) -> @@ -781,14 +820,19 @@ reply(Reply, State) -> {State1, Timeout} = next_state(State), {reply, Reply, State1, Timeout}. -next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) -> - {State, hibernate}; -next_state(State = #msstate { sync_timer_ref = undefined }) -> - {start_sync_timer(State), 0}; -next_state(State = #msstate { on_sync = [] }) -> - {stop_sync_timer(State), hibernate}; -next_state(State) -> - {State, 0}. +next_state(State = #msstate { sync_timer_ref = undefined, + on_sync = Syncs, + cref_to_guids = CTG }) -> + case {Syncs, dict:size(CTG)} of + {[], 0} -> {State, hibernate}; + _ -> {start_sync_timer(State), 0} + end; +next_state(State = #msstate { on_sync = Syncs, + cref_to_guids = CTG }) -> + case {Syncs, dict:size(CTG)} of + {[], 0} -> {stop_sync_timer(State), hibernate}; + _ -> {State, 0} + end. start_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, [self()]), @@ -800,15 +844,21 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> {ok, cancel} = timer:cancel(TRef), State #msstate { sync_timer_ref = undefined }. -internal_sync(State = #msstate { current_file_handle = CurHdl, - on_sync = Syncs }) -> +internal_sync(State = #msstate { current_file_handle = CurHdl, + on_sync = Syncs, + client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> State1 = stop_sync_timer(State), - case Syncs of - [] -> State1; - _ -> ok = file_handle_cache:sync(CurHdl), - lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), - State1 #msstate { on_sync = [] } - end. + CGs = dict:fold(fun (_CRef, [], NS) -> NS; + (CRef, Guids, NS) -> [{CRef, Guids} | NS] + end, [], CTG), + if Syncs =:= [] andalso CGs =:= [] -> ok; + true -> file_handle_cache:sync(CurHdl) + end, + lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), + [(dict:fetch(CRef, CODC))(Guids) || {CRef, Guids} <- CGs], + State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. + write_message(Guid, Msg, State = #msstate { current_file_handle = CurHdl, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 1b837128..b58c5a60 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/4, terminate/2, delete_and_terminate/1, publish/5, +-export([init/5, terminate/2, delete_and_terminate/1, publish/5, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). @@ -171,7 +171,7 @@ %%---------------------------------------------------------------------------- -record(qistate, { dir, segments, journal_handle, dirty_count, - max_journal_entries }). + max_journal_entries, on_sync, unsynced_guids }). -record(segment, { num, path, journal_entries, unacked }). @@ -190,18 +190,21 @@ })). -type(seq_id() :: integer()). -type(seg_dict() :: {dict:dictionary(), [segment()]}). +-type(on_sync_fun() :: fun (([rabbit_guid:guid()]) -> ok)). -type(qistate() :: #qistate { dir :: file:filename(), segments :: 'undefined' | seg_dict(), journal_handle :: hdl(), dirty_count :: integer(), - max_journal_entries :: non_neg_integer() + max_journal_entries :: non_neg_integer(), + on_sync :: on_sync_fun(), + unsynced_guids :: [rabbit_guid:guid()] }). -type(startup_fun_state() :: - {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), + {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}), A}). --spec(init/4 :: (rabbit_amqqueue:name(), boolean(), boolean(), - fun ((rabbit_guid:guid()) -> boolean())) -> +-spec(init/5 :: (rabbit_amqqueue:name(), boolean(), boolean(), + fun ((rabbit_guid:guid()) -> boolean()), on_sync_fun()) -> {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). @@ -229,12 +232,12 @@ %% public API %%---------------------------------------------------------------------------- -init(Name, false, _MsgStoreRecovered, _ContainsCheckFun) -> +init(Name, false, _MsgStoreRecovered, _ContainsCheckFun, OnSyncFun) -> State = #qistate { dir = Dir } = blank_state(Name), false = filelib:is_file(Dir), %% is_file == is file or dir - {0, [], State}; + {0, [], State #qistate { on_sync = OnSyncFun }}; -init(Name, true, MsgStoreRecovered, ContainsCheckFun) -> +init(Name, true, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) -> State = #qistate { dir = Dir } = blank_state(Name), Terms = case read_shutdown_terms(Dir) of {error, _} -> []; @@ -247,7 +250,7 @@ init(Name, true, MsgStoreRecovered, ContainsCheckFun) -> init_clean(RecoveredCounts, State); false -> init_dirty(CleanShutdown, ContainsCheckFun, State) end, - {Count, Terms, State1}. + {Count, Terms, State1 #qistate { on_sync = OnSyncFun }}. terminate(Terms, State) -> {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), @@ -259,9 +262,13 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(Guid, SeqId, MsgProps, IsPersistent, State) when is_binary(Guid) -> +publish(Guid, SeqId, MsgProps, IsPersistent, + State = #qistate { unsynced_guids = UnsyncedGuids }) + when is_binary(Guid) -> ?GUID_BYTES = size(Guid), - {JournalHdl, State1} = get_journal_handle(State), + {JournalHdl, State1} = get_journal_handle( + State #qistate { + unsynced_guids = [Guid | UnsyncedGuids] }), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; @@ -292,7 +299,7 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> %% seqids not being in the journal, provided the transaction isn't %% emptied (handled above anyway). ok = file_handle_cache:sync(JournalHdl), - State. + notify_sync(State). flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). @@ -381,7 +388,9 @@ blank_state(QueueName) -> segments = segments_new(), journal_handle = undefined, dirty_count = 0, - max_journal_entries = MaxJournal }. + max_journal_entries = MaxJournal, + on_sync = fun (_) -> ok end, + unsynced_guids = [] }. clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). @@ -613,7 +622,7 @@ flush_journal(State = #qistate { segments = Segments }) -> {JournalHdl, State1} = get_journal_handle(State #qistate { segments = Segments1 }), ok = file_handle_cache:clear(JournalHdl), - State1 #qistate { dirty_count = 0 }. + notify_sync(State1 #qistate { dirty_count = 0 }). append_journal_to_segment(#segment { journal_entries = JEntries, path = Path } = Segment) -> @@ -701,6 +710,10 @@ deliver_or_ack(Kind, SeqIds, State) -> add_to_journal(SeqId, Kind, StateN) end, State1, SeqIds)). +notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) -> + OnSyncFun(UG), + State #qistate { unsynced_guids = [] }. + %%---------------------------------------------------------------------------- %% segment manipulation %%---------------------------------------------------------------------------- diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 00df1ce1..4a1a08e4 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -72,17 +72,19 @@ deliver(QNames, Delivery = #delivery{mandatory = false, QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {routed, QPids}; -deliver(QNames, Delivery) -> +deliver(QNames, Delivery = #delivery{mandatory = Mandatory, + immediate = Immediate}) -> QPids = lookup_qpids(QNames), - {Success, _} = - delegate:invoke(QPids, - fun (Pid) -> - rabbit_amqqueue:deliver(Pid, Delivery) - end), + {Success, _} = delegate:invoke( + QPids, fun (Pid) -> + rabbit_amqqueue:deliver(Pid, Delivery) + end), {Routed, Handled} = - lists:foldl(fun fold_deliveries/2, {false, []}, Success), - check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - {Routed, Handled}). + lists:foldl(fun fold_deliveries/2, {false, []}, Success), + case check_delivery(Mandatory, Immediate, {Routed, Handled}) of + {routed, Qs} -> {routed, Qs}; + O -> O + end. %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same source diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 435fdfac..5986e11e 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1470,7 +1470,7 @@ msg_store_remove(Guids) -> foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> rabbit_msg_store:client_terminate( lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end, - rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore). + rabbit_msg_store:client_init(MsgStore, Ref, undefined), L), MsgStore). test_msg_store() -> restart_msg_store_empty(), @@ -1480,7 +1480,7 @@ test_msg_store() -> %% check we don't contain any of the msgs we're about to publish false = msg_store_contains(false, Guids), Ref = rabbit_guid:guid(), - MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, undefined), %% publish the first half {ok, MSCState1} = msg_store_write(Guids1stHalf, MSCState), %% sync on the first half @@ -1554,7 +1554,7 @@ test_msg_store() -> %% check we don't contain any of the msgs false = msg_store_contains(false, Guids), %% publish the first half again - MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, undefined), {ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8), %% this should force some sort of sync internally otherwise misread ok = rabbit_msg_store:client_terminate( @@ -1608,6 +1608,9 @@ init_test_queue() -> test_queue(), true, false, fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) + end, + fun (_) -> + ok %% Sync! end). restart_test_queue(Qi) -> @@ -1643,7 +1646,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid, Guid, MSCStateN), {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM} - end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds), + end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref, undefined)}, SeqIds), ok = rabbit_msg_store:client_delete_and_terminate( MSCStateEnd, MsgStore, Ref), {A, B}. @@ -1827,7 +1830,8 @@ assert_props(List, PropVals) -> with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), - VQ = rabbit_variable_queue:init(test_queue(), true, false), + VQ = rabbit_variable_queue:init(test_queue(), true, false, + fun nop/1, fun nop/1), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -1882,7 +1886,6 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% start by sending in a couple of segments worth Len = 2*SegmentSize, VQ1 = variable_queue_publish(false, Len, VQ0), - %% squeeze and relax queue Churn = Len div 32, VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), @@ -1900,7 +1903,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags, VQ8), + {_, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -1910,7 +1913,8 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)). + {_, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), + publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), @@ -1943,7 +1947,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + {_, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -1972,7 +1976,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + fun nop/1, fun nop/1), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -1988,7 +1993,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + fun nop/1, fun nop/1), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2018,10 +2024,13 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), - VQ1 = rabbit_variable_queue:init(QName, true, true), + VQ1 = rabbit_variable_queue:init(QName, true, true, + fun nop/1, fun nop/1), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), rabbit_amqqueue:internal_delete(QName) end), passed. + +nop(_) -> ok. diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 91f2c4ca..b037b46c 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -89,7 +89,8 @@ sender :: pid(), message :: message()}). -type(message_properties() :: - #message_properties{expiry :: pos_integer() | 'undefined'}). + #message_properties{expiry :: pos_integer() | 'undefined', + needs_confirming :: boolean()}). %% this is really an abstract type, but dialyzer does not support them -type(txn() :: rabbit_guid:guid()). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index a1c442d3..bd7be940 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,7 +31,7 @@ -module(rabbit_variable_queue). --export([init/3, terminate/1, delete_and_terminate/1, +-export([init/5, init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, @@ -236,8 +236,11 @@ ram_index_count, out_counter, in_counter, - rates - }). + rates, + msgs_on_disk, + msg_indices_on_disk, + unconfirmed + }). -record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). @@ -324,7 +327,10 @@ ram_index_count :: non_neg_integer(), out_counter :: non_neg_integer(), in_counter :: non_neg_integer(), - rates :: rates() }). + rates :: rates(), + msgs_on_disk :: gb_set(), + msg_indices_on_disk :: gb_set(), + unconfirmed :: gb_set()}). -include("rabbit_backing_queue_spec.hrl"). @@ -371,13 +377,21 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). init(QueueName, IsDurable, Recover) -> + Self = self(), + init(QueueName, IsDurable, Recover, + fun (Guids) -> msgs_written_to_disk(Self, Guids) end, + fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end). + +init(QueueName, IsDurable, Recover, + MsgOnDiskFun, MsgIdxOnDiskFun) -> {DeltaCount, Terms, IndexState} = rabbit_queue_index:init( QueueName, Recover, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) - end), + end, + MsgIdxOnDiskFun), {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), {PRef, TRef, Terms1} = @@ -395,12 +409,16 @@ init(QueueName, IsDurable, Recover) -> end_seq_id = NextSeqId } end, Now = now(), + PersistentClient = case IsDurable of - true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef); + true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef, + MsgOnDiskFun); false -> undefined end, - TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), + TransientClient = + rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef, undefined), + State = #vqstate { q1 = queue:new(), q2 = bpqueue:new(), @@ -430,7 +448,10 @@ init(QueueName, IsDurable, Recover) -> ingress = {Now, DeltaCount1}, avg_egress = 0.0, avg_ingress = 0.0, - timestamp = Now } }, + timestamp = Now }, + msgs_on_disk = gb_sets:new(), + msg_indices_on_disk = gb_sets:new(), + unconfirmed = gb_sets:new() }, a(maybe_deltas_to_betas(State)). terminate(State) -> @@ -505,26 +526,30 @@ publish(Msg, MsgProps, State) -> publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; -publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, - MsgProps, +publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, + guid = Guid }, + MsgProps = #message_properties { needs_confirming = NeedsConfirming }, State = #vqstate { len = 0, next_seq_id = SeqId, out_counter = OutCount, in_counter = InCount, persistent_count = PCount, pending_ack = PA, - durable = IsDurable }) -> + durable = IsDurable, + unconfirmed = Unconfirmed }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), PA1 = record_pending_ack(m(MsgStatus1), PA), PCount1 = PCount + one_if(IsPersistent1), + Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed), {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1, persistent_count = PCount1, - pending_ack = PA1 })}. + pending_ack = PA1, + unconfirmed = Unconfirmed1 })}. dropwhile(Pred, State) -> {_OkOrEmpty, State1} = dropwhile1(Pred, State), @@ -634,9 +659,10 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { pending_ack = PA1 })}. ack(AckTags, State) -> - a(ack(fun rabbit_msg_store:remove/2, - fun (_AckEntry, State1) -> State1 end, - AckTags, State)). + {Guids, State1} = ack(fun rabbit_msg_store:remove/2, + fun (_AckEntry, State1) -> State1 end, + AckTags, State), + {Guids, a(State1)}. tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, State = #vqstate { durable = IsDurable, @@ -684,7 +710,7 @@ tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> end)}. requeue(AckTags, MsgPropsFun, State) -> - a(reduce_memory_use( + {_Guids, State1} = ack(fun rabbit_msg_store:release/2, fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps), @@ -699,7 +725,8 @@ requeue(AckTags, MsgPropsFun, State) -> true, true, State2), State3 end, - AckTags, State))). + AckTags, State), + a(reduce_memory_use(State1)). len(#vqstate { len = Len }) -> Len. @@ -844,6 +871,9 @@ one_if(false) -> 0. cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. +gb_sets_maybe_insert(false, _Val, Set) -> Set; +gb_sets_maybe_insert(true, Val, Set) -> gb_sets:insert(Val, Set). + msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, MsgProps) -> #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, @@ -1013,6 +1043,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { durable = IsDurable }) -> PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), + {_Guids, NewState} = ack(Acks, State), Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), {Msg, MsgProps} <- lists:reverse(PubsN)], {SeqIds, State1 = #vqstate { index_state = IndexState }} = @@ -1024,7 +1055,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { {SeqId, State3} = publish(Msg, MsgProps, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} - end, {PAcks, ack(Acks, State)}, Pubs), + end, {PAcks, NewState}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], reduce_memory_use( @@ -1078,15 +1109,17 @@ sum_guids_by_store_to_len(LensByStore, GuidsByStore) -> %% Internal gubbins for publishing %%---------------------------------------------------------------------------- -publish(Msg = #basic_message { is_persistent = IsPersistent }, - MsgProps, IsDelivered, MsgOnDisk, +publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, + MsgProps = #message_properties { needs_confirming = NeedsConfirming }, + IsDelivered, MsgOnDisk, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, len = Len, in_counter = InCount, persistent_count = PCount, durable = IsDurable, - ram_msg_count = RamMsgCount }) -> + ram_msg_count = RamMsgCount, + unconfirmed = Unconfirmed}) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, @@ -1096,11 +1129,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), + Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed), {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, len = Len + 1, in_counter = InCount + 1, persistent_count = PCount1, - ram_msg_count = RamMsgCount + 1}}. + ram_msg_count = RamMsgCount + 1, + unconfirmed = Unconfirmed1 }}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, MSCState) -> @@ -1189,7 +1224,7 @@ remove_pending_ack(KeepPersistent, end. ack(_MsgStoreFun, _Fun, [], State) -> - State; + {[], State}; ack(MsgStoreFun, Fun, AckTags, State) -> {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, persistent_count = PCount }} = @@ -1201,13 +1236,16 @@ ack(MsgStoreFun, Fun, AckTags, State) -> pending_ack = dict:erase(SeqId, PA) })} end, {{[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - ok = orddict:fold(fun (MsgStore, Guids, ok) -> - MsgStoreFun(MsgStore, Guids) - end, ok, GuidsByStore), + AckdGuids = lists:concat( + orddict:fold(fun (MsgStore, Guids, Gs) -> + MsgStoreFun(MsgStore, Guids), + [Guids | Gs] + end, [], GuidsByStore)), + State2 = remove_confirms(gb_sets:from_list(AckdGuids), State1), PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), - State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1 }. + {AckdGuids, State2 #vqstate { index_state = IndexState1, + persistent_count = PCount1 }}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, @@ -1224,6 +1262,46 @@ find_persistent_count(LensByStore) -> end. %%---------------------------------------------------------------------------- +%% Internal plumbing for confirms (aka publisher acks) +%%---------------------------------------------------------------------------- + +remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet), + msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet), + unconfirmed = gb_sets:difference(UC, GuidSet) }. + +msgs_confirmed(GuidSet, State) -> + {remove_confirms(GuidSet, State), {confirm, gb_sets:to_list(GuidSet)}}. + +msgs_written_to_disk(QPid, Guids) -> + rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + QPid, fun(State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + GuidSet = gb_sets:from_list(Guids), + msgs_confirmed(gb_sets:intersection(GuidSet, MIOD), + State #vqstate { + msgs_on_disk = + gb_sets:intersection( + gb_sets:union(MOD, GuidSet), UC) }) + end). + +msg_indices_written_to_disk(QPid, Guids) -> + rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + QPid, fun(State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + GuidSet = gb_sets:from_list(Guids), + msgs_confirmed(gb_sets:intersection(GuidSet, MOD), + State #vqstate { + msg_indices_on_disk = + gb_sets:intersection( + gb_sets:union(MIOD, GuidSet), UC) }) + end). + +%%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- |