diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-06-26 18:21:18 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-06-26 18:21:18 +0100 |
commit | ed7c2e337d8ce533ad256a841fb90c3e045c5d61 (patch) | |
tree | c60c02ba75134c0b8b5639c56e12c93488f3f508 | |
parent | 398ae196b2376f17d161a1e8fc5775ddaedb5493 (diff) | |
parent | 907bdbfb4bd9a099882379cfd09e31f2672d772b (diff) | |
download | rabbitmq-server-ed7c2e337d8ce533ad256a841fb90c3e045c5d61.tar.gz |
merge default into bug24216
-rw-r--r-- | docs/rabbitmqctl.1.xml | 12 | ||||
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 17 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 32 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 127 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 21 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 32 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 141 | ||||
-rw-r--r-- | src/rabbit_control.erl | 2 | ||||
-rw-r--r-- | src/rabbit_error_logger.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 41 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 34 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 37 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 18 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 49 | ||||
-rw-r--r-- | src/rabbit_types.erl | 6 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 237 |
17 files changed, 153 insertions, 658 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index a0f03192..fdb49912 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1196,10 +1196,6 @@ <listitem><para>Virtual host in which the channel operates.</para></listitem> </varlistentry> <varlistentry> - <term>transactional</term> - <listitem><para>True if the channel is in transactional mode, false otherwise.</para></listitem> - </varlistentry> - <varlistentry> <term>consumer_count</term> <listitem><para>Number of logical AMQP consumers retrieving messages via the channel.</para></listitem> @@ -1210,11 +1206,6 @@ yet acknowledged.</para></listitem> </varlistentry> <varlistentry> - <term>acks_uncommitted</term> - <listitem><para>Number of acknowledgements received in an as yet - uncommitted transaction.</para></listitem> - </varlistentry> - <varlistentry> <term>prefetch_count</term> <listitem><para>QoS prefetch count limit in force, 0 if unlimited.</para></listitem> </varlistentry> @@ -1239,8 +1230,7 @@ </variablelist> <para> If no <command>channelinfoitem</command>s are specified then pid, - user, transactional, consumer_count, and - messages_unacknowledged are assumed. + user, consumer_count, and messages_unacknowledged are assumed. </para> <para role="example-prefix"> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 00b7e6e9..3861df2a 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -67,8 +67,7 @@ is_persistent}). -record(ssl_socket, {tcp, ssl}). --record(delivery, {mandatory, immediate, txn, sender, message, - msg_seq_no}). +-record(delivery, {mandatory, immediate, sender, message, msg_seq_no}). -record(amqp_error, {name, explanation = "", method = none}). -record(event, {type, props, timestamp}). diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 295d9039..ee102f5e 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -26,12 +26,11 @@ fun ((rabbit_types:message_properties()) -> rabbit_types:message_properties())). -type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). --type(sync_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok' | 'error')). -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(), - async_callback(), sync_callback()) -> state()). +-spec(init/3 :: (rabbit_types:amqqueue(), attempt_recovery(), + async_callback()) -> state()). -spec(terminate/2 :: (any(), state()) -> state()). -spec(delete_and_terminate/2 :: (any(), state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). @@ -51,14 +50,6 @@ -spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()}; (false, state()) -> {fetch_result(undefined), state()}). -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). --spec(tx_publish/5 :: (rabbit_types:txn(), rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), state()) -> - state()). --spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). --spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}). --spec(tx_commit/4 :: - (rabbit_types:txn(), fun (() -> any()), - message_properties_transformer(), state()) -> {[ack()], state()}). -spec(requeue/3 :: ([ack()], message_properties_transformer(), state()) -> {[rabbit_guid:guid()], state()}). -spec(len/1 :: (state()) -> non_neg_integer()). @@ -71,7 +62,7 @@ -spec(handle_pre_hibernate/1 :: (state()) -> state()). -spec(status/1 :: (state()) -> [{atom(), any()}]). -spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()). --spec(is_duplicate/3 :: - (rabbit_types:txn(), rabbit_types:basic_message(), state()) -> +-spec(is_duplicate/2 :: + (rabbit_types:basic_message(), state()) -> {'false'|'published'|'discarded', state()}). -spec(discard/3 :: (rabbit_types:basic_message(), pid(), state()) -> state()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index bacb1d21..e9d01d12 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -20,19 +20,18 @@ -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, - stat/1, deliver/2, requeue/3, ack/4, reject/4]). + stat/1, deliver/2, requeue/3, ack/3, reject/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, unblock/2, flush_all/2]). --export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). +-export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -export([store_queue/1]). %% internal --export([internal_declare/2, internal_delete/1, - run_backing_queue/3, run_backing_queue_async/3, +-export([internal_declare/2, internal_delete/1, run_backing_queue/3, sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2, maybe_expire/1, drop_expired/1, emit_stats/1]). @@ -117,12 +116,8 @@ -spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()). -spec(deliver/2 :: (pid(), rabbit_types:delivery()) -> boolean()). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). --spec(ack/4 :: - (pid(), rabbit_types:maybe(rabbit_types:txn()), [msg_id()], pid()) - -> 'ok'). +-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). --spec(commit_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> ok_or_errors()). --spec(rollback_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> 'ok'). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> @@ -147,9 +142,6 @@ -spec(run_backing_queue/3 :: (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). --spec(run_backing_queue_async/3 :: - (pid(), atom(), - (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). -spec(sync_timeout/1 :: (pid()) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). @@ -436,21 +428,12 @@ deliver(QPid, Delivery) -> requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}). -ack(QPid, Txn, MsgIds, ChPid) -> - delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}). +ack(QPid, MsgIds, ChPid) -> + delegate_cast(QPid, {ack, MsgIds, ChPid}). reject(QPid, MsgIds, Requeue, ChPid) -> delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}). -commit_all(QPids, Txn, ChPid) -> - safe_delegate_call_ok( - fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end, - QPids). - -rollback_all(QPids, Txn, ChPid) -> - delegate:invoke_no_result( - QPids, fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end). - notify_down_all(QPids, ChPid) -> safe_delegate_call_ok( fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end, @@ -501,9 +484,6 @@ internal_delete(QueueName) -> end). run_backing_queue(QPid, Mod, Fun) -> - gen_server2:call(QPid, {run_backing_queue, Mod, Fun}, infinity). - -run_backing_queue_async(QPid, Mod, Fun) -> gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). sync_timeout(QPid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c1fa048d..3e2bbf8d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -62,7 +62,6 @@ monitor_ref, acktags, is_limit_active, - txn, unsent_message_count}). -define(STATISTICS_KEYS, @@ -193,14 +192,7 @@ bq_init(BQ, Q, Recover) -> Self = self(), BQ:init(Q, Recover, fun (Mod, Fun) -> - rabbit_amqqueue:run_backing_queue_async(Self, Mod, Fun) - end, - fun (Mod, Fun) -> - rabbit_misc:with_exit_handler( - fun () -> error end, - fun () -> - rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) - end) + rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) end). process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> @@ -217,22 +209,14 @@ init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}). terminate_shutdown(Fun, State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = + State1 = #q{backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), case BQS of undefined -> State; _ -> ok = rabbit_memory_monitor:deregister(self()), - BQS1 = lists:foldl( - fun (#cr{txn = none}, BQSN) -> - BQSN; - (#cr{txn = Txn}, BQSN) -> - {_AckTags, BQSN1} = - BQ:tx_rollback(Txn, BQSN), - BQSN1 - end, BQS, all_ch_record()), [emit_consumer_deleted(Ch, CTag) || {Ch, CTag, _} <- consumers(State1)], - State1#q{backing_queue_state = Fun(BQS1)} + State1#q{backing_queue_state = Fun(BQS)} end. reply(Reply, NewState) -> @@ -343,7 +327,6 @@ ch_record(ChPid) -> monitor_ref = MonitorRef, acktags = sets:new(), is_limit_active = false, - txn = none, unsent_message_count = 0}, put(Key, C), C; @@ -355,13 +338,12 @@ store_ch_record(C = #cr{ch_pid = ChPid}) -> maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount, acktags = ChAckTags, - txn = Txn, unsent_message_count = UnsentMessageCount}) -> - case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount, Txn} of - {0, 0, 0, none} -> ok = erase_ch_record(C), - false; - _ -> store_ch_record(C), - true + case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of + {0, 0, 0} -> ok = erase_ch_record(C), + false; + _ -> store_ch_record(C), + true end. erase_ch_record(#cr{ch_pid = ChPid, @@ -513,8 +495,7 @@ run_message_queue(State) -> {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), State2. -attempt_delivery(Delivery = #delivery{txn = none, - sender = ChPid, +attempt_delivery(Delivery = #delivery{sender = ChPid, message = Message, msg_seq_no = MsgSeqNo}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -523,7 +504,7 @@ attempt_delivery(Delivery = #delivery{txn = none, immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); _ -> ok end, - case BQ:is_duplicate(none, Message, BQS) of + case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = @@ -555,24 +536,6 @@ attempt_delivery(Delivery = #delivery{txn = none, discarded -> false end, {Delivered, Confirm, State#q{backing_queue_state = BQS1}} - end; -attempt_delivery(Delivery = #delivery{txn = Txn, - sender = ChPid, - message = Message}, - State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - Confirm = should_confirm_message(Delivery, State), - case BQ:is_duplicate(Txn, Message, BQS) of - {false, BQS1} -> - store_ch_record((ch_record(ChPid))#cr{txn = Txn}), - BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid, - BQS1), - {true, Confirm, State#q{backing_queue_state = BQS2}}; - {Duplicate, BQS1} -> - Delivered = case Duplicate of - published -> true; - discarded -> false - end, - {Delivered, Confirm, State#q{backing_queue_state = BQS1}} end. deliver_or_enqueue(Delivery = #delivery{message = Message, @@ -652,7 +615,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> case lookup_ch(DownPid) of not_found -> {ok, State}; - C = #cr{ch_pid = ChPid, txn = Txn, acktags = ChAckTags} -> + C = #cr{ch_pid = ChPid, acktags = ChAckTags} -> ok = erase_ch_record(C), State1 = State#q{ exclusive_consumer = case Holder of @@ -665,13 +628,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> ChPid, State#q.blocked_consumers)}, case should_auto_delete(State1) of true -> {stop, State1}; - false -> State2 = case Txn of - none -> State1; - _ -> rollback_transaction(Txn, C, - State1) - end, - {ok, requeue_and_run(sets:to_list(ChAckTags), - ensure_expiry_timer(State2))} + false -> {ok, requeue_and_run(sets:to_list(ChAckTags), + ensure_expiry_timer(State1))} end end. @@ -705,25 +663,6 @@ run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}). -commit_transaction(Txn, From, C = #cr{acktags = ChAckTags}, - State = #q{backing_queue = BQ, - backing_queue_state = BQS, - ttl = TTL}) -> - {AckTags, BQS1} = BQ:tx_commit( - Txn, fun () -> gen_server2:reply(From, ok) end, - reset_msg_expiry_fun(TTL), BQS), - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), - State#q{backing_queue_state = BQS1}. - -rollback_transaction(Txn, C, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), - %% Iff we removed acktags from the channel record on ack+txn then - %% we would add them back in here. - maybe_store_ch_record(C#cr{txn = none}), - State#q{backing_queue_state = BQS1}. - subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). @@ -848,7 +787,6 @@ prioritise_call(Msg, _From, _State) -> info -> 9; {info, _Items} -> 9; consumers -> 9; - {run_backing_queue, _Mod, _Fun} -> 6; _ -> 0 end. @@ -861,7 +799,7 @@ prioritise_cast(Msg, _State) -> maybe_expire -> 8; drop_expired -> 8; emit_stats -> 7; - {ack, _Txn, _AckTags, _ChPid} -> 7; + {ack, _AckTags, _ChPid} -> 7; {reject, _AckTags, _Requeue, _ChPid} -> 7; {notify_sent, _ChPid} -> 7; {unblock, _ChPid} -> 7; @@ -933,13 +871,6 @@ handle_call({deliver, Delivery}, From, State) -> gen_server2:reply(From, true), noreply(deliver_or_enqueue(Delivery, State)); -handle_call({commit, Txn, ChPid}, From, State) -> - case lookup_ch(ChPid) of - not_found -> reply(ok, State); - C -> noreply(run_message_queue( - commit_transaction(Txn, From, C, State))) - end; - handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues %% are no longer visible by the time we send a response to the @@ -1079,11 +1010,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(requeue_and_run(AckTags, State)) - end; - -handle_call({run_backing_queue, Mod, Fun}, _From, State) -> - reply(ok, run_backing_queue(Mod, Fun, State)). - + end. handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -1095,24 +1022,16 @@ handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. noreply(deliver_or_enqueue(Delivery, State)); -handle_cast({ack, Txn, AckTags, ChPid}, +handle_cast({ack, AckTags, ChPid}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case lookup_ch(ChPid) of not_found -> noreply(State); C = #cr{acktags = ChAckTags} -> - {C1, State1} = - case Txn of - none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), - NewC = C#cr{acktags = ChAckTags1}, - {_Guids, BQS1} = BQ:ack(AckTags, BQS), - {NewC, State#q{backing_queue_state = BQS1}}; - _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS), - {C#cr{txn = Txn}, - State#q{backing_queue_state = BQS1}} - end, - maybe_store_ch_record(C1), - noreply(State1) + maybe_store_ch_record(C#cr{acktags = subtract_acks( + ChAckTags, AckTags)}), + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + noreply(State#q{backing_queue_state = BQS1}) end; handle_cast({reject, AckTags, Requeue, ChPid}, @@ -1131,12 +1050,6 @@ handle_cast({reject, AckTags, Requeue, ChPid}, end) end; -handle_cast({rollback, Txn, ChPid}, State) -> - noreply(case lookup_ch(ChPid) of - not_found -> State; - C -> rollback_transaction(Txn, C, State) - end); - handle_cast(delete_immediately, State) -> {stop, normal, State}; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 217ad3eb..77278416 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -44,9 +44,7 @@ behaviour_info(callbacks) -> %% makes it useful for passing messages back into the backing %% queue, especially as the backing queue does not have %% control of its own mailbox. - %% 4. a synchronous callback. Same as the asynchronous callback - %% but waits for completion and returns 'error' on error. - {init, 4}, + {init, 3}, %% Called on queue shutdown when queue isn't being deleted. {terminate, 2}, @@ -107,21 +105,6 @@ behaviour_info(callbacks) -> %% about. Must return 1 msg_id per Ack, in the same order as Acks. {ack, 2}, - %% A publish, but in the context of a transaction. - {tx_publish, 5}, - - %% Acks, but in the context of a transaction. - {tx_ack, 3}, - - %% Undo anything which has been done in the context of the - %% specified transaction. - {tx_rollback, 2}, - - %% Commit a transaction. The Fun passed in must be called once - %% the messages have really been commited. This CPS permits the - %% possibility of commit coalescing. - {tx_commit, 4}, - %% Reinsert messages into the queue which have already been %% delivered and were pending acknowledgement. {requeue, 3}, @@ -175,7 +158,7 @@ behaviour_info(callbacks) -> %% the BQ to signal that it's already seen this message (and in %% what capacity - i.e. was it published previously or discarded %% previously) and thus the message should be dropped. - {is_duplicate, 3}, + {is_duplicate, 2}, %% Called to inform the BQ about messages which have reached the %% queue, but are not going to be further passed to BQ for some diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index fa7e3a5a..ec8ed351 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -18,8 +18,8 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/3, message/4, properties/1, delivery/5]). --export([publish/4, publish/7]). +-export([publish/1, message/3, message/4, properties/1, delivery/4]). +-export([publish/4, publish/6]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -37,9 +37,8 @@ -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). --spec(delivery/5 :: - (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), - rabbit_types:message(), undefined | integer()) -> +-spec(delivery/4 :: + (boolean(), boolean(), rabbit_types:message(), undefined | integer()) -> rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), @@ -53,10 +52,9 @@ -spec(publish/4 :: (exchange_input(), rabbit_router:routing_key(), properties_input(), body_input()) -> publish_result()). --spec(publish/7 :: +-spec(publish/6 :: (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(), - rabbit_types:maybe(rabbit_types:txn()), properties_input(), - body_input()) -> publish_result()). + properties_input(), body_input()) -> publish_result()). -spec(build_content/2 :: (rabbit_framing:amqp_property_record(), binary() | [binary()]) -> rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> @@ -73,9 +71,9 @@ publish(Delivery = #delivery{ Other -> Other end. -delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) -> - #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, - sender = self(), message = Message, msg_seq_no = MsgSeqNo}. +delivery(Mandatory, Immediate, Message, MsgSeqNo) -> + #delivery{mandatory = Mandatory, immediate = Immediate, sender = self(), + message = Message, msg_seq_no = MsgSeqNo}. build_content(Properties, BodyBin) when is_binary(BodyBin) -> build_content(Properties, [BodyBin]); @@ -157,19 +155,17 @@ indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1). %% Convenience function, for avoiding round-trips in calls across the %% erlang distributed network. publish(Exchange, RoutingKeyBin, Properties, Body) -> - publish(Exchange, RoutingKeyBin, false, false, none, Properties, - Body). + publish(Exchange, RoutingKeyBin, false, false, Properties, Body). %% Convenience function, for avoiding round-trips in calls across the %% erlang distributed network. -publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Txn, - Props, Body) -> - publish(X, delivery(Mandatory, Immediate, Txn, +publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) -> + publish(X, delivery(Mandatory, Immediate, message(XName, RKey, properties(Props), Body), undefined)); -publish(XName, RKey, Mandatory, Immediate, Txn, Props, Body) -> +publish(XName, RKey, Mandatory, Immediate, Props, Body) -> case rabbit_exchange:lookup(XName) of - {ok, X} -> publish(X, RKey, Mandatory, Immediate, Txn, Props, Body); + {ok, X} -> publish(X, RKey, Mandatory, Immediate, Props, Body); Err -> Err end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 991b0b06..36471bf5 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -30,8 +30,7 @@ prioritise_cast/2]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, - limiter_pid, start_limiter_fun, transaction_id, tx_participants, - next_tag, uncommitted_ack_q, unacked_message_q, + limiter_pid, start_limiter_fun, next_tag, unacked_message_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, consumer_monitors, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, @@ -41,12 +40,10 @@ -define(STATISTICS_KEYS, [pid, - transactional, confirm, consumer_count, messages_unacknowledged, messages_unconfirmed, - acks_uncommitted, prefetch_count, client_flow_blocked]). @@ -173,10 +170,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, conn_pid = ConnPid, limiter_pid = undefined, start_limiter_fun = StartLimiterFun, - transaction_id = none, - tx_participants = sets:new(), next_tag = 1, - uncommitted_ack_q = queue:new(), unacked_message_q = queue:new(), user = User, virtual_host = VHost, @@ -331,7 +325,7 @@ handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> {hibernate, State#ch{stats_timer = StatsTimer1}}. terminate(Reason, State) -> - {Res, _State1} = rollback_and_notify(State), + {Res, _State1} = notify_queues(State), case Reason of normal -> ok = Res; shutdown -> ok = Res; @@ -386,8 +380,8 @@ send_exception(Reason, State = #ch{protocol = Protocol, rabbit_binary_generator:map_exception(Channel, Reason, Protocol), rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", [ConnPid, Channel, Reason]), - %% something bad's happened: rollback_and_notify may not be 'ok' - {_Result, State1} = rollback_and_notify(State), + %% something bad's happened: notify_queues may not be 'ok' + {_Result, State1} = notify_queues(State), case CloseChannel of Channel -> ok = rabbit_writer:send_command(WriterPid, CloseMethod), {noreply, State1}; @@ -589,7 +583,7 @@ handle_method(_Method, _, State = #ch{state = closing}) -> {noreply, State}; handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> - {ok, State1} = rollback_and_notify(State), + {ok, State1} = notify_queues(State), ReaderPid ! {channel_closing, self()}, {noreply, State1}; @@ -601,7 +595,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, mandatory = Mandatory, immediate = Immediate}, Content, State = #ch{virtual_host = VHostPath, - transaction_id = TxnKey, confirm_enabled = ConfirmEnabled, trace_state = TraceState}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), @@ -623,19 +616,15 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, rabbit_trace:tap_trace_in(Message, TraceState), {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( - Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, - MsgSeqNo)), + Exchange, rabbit_basic:delivery(Mandatory, Immediate, Message, + MsgSeqNo)), State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName, MsgSeqNo, Message, State1), maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || QPid <- DeliveredQPids]], publish, State2), - {noreply, case TxnKey of - none -> State2; - _ -> add_tx_participants(DeliveredQPids, State2) - end}; + {noreply, State2}; {error, Reason} -> rabbit_misc:protocol_error(precondition_failed, "invalid message: ~p", [Reason]) @@ -649,22 +638,12 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, - _, State = #ch{transaction_id = TxnKey, - unacked_message_q = UAMQ}) -> + _, State = #ch{unacked_message_q = UAMQ}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), - QIncs = ack(TxnKey, Acked), - Participants = [QPid || {QPid, _} <- QIncs], + QIncs = ack(Acked), maybe_incr_stats(QIncs, ack, State), - {noreply, case TxnKey of - none -> ok = notify_limiter(State#ch.limiter_pid, Acked), - State#ch{unacked_message_q = Remaining}; - _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q, - Acked), - add_tx_participants( - Participants, - State#ch{unacked_message_q = Remaining, - uncommitted_ack_q = NewUAQ}) - end}; + ok = notify_limiter(State#ch.limiter_pid, Acked), + {noreply, State#ch{unacked_message_q = Remaining}}; handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, @@ -1048,35 +1027,6 @@ handle_method(#'queue.purge'{queue = QueueNameBin, #'queue.purge_ok'{message_count = PurgedMessageCount}); -handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) -> - rabbit_misc:protocol_error( - precondition_failed, "cannot switch from confirm to tx mode", []); - -handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none}) -> - {reply, #'tx.select_ok'{}, new_tx(State)}; - -handle_method(#'tx.select'{}, _, State) -> - {reply, #'tx.select_ok'{}, State}; - -handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) -> - rabbit_misc:protocol_error( - precondition_failed, "channel is not transactional", []); - -handle_method(#'tx.commit'{}, _, State) -> - {reply, #'tx.commit_ok'{}, internal_commit(State)}; - -handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> - rabbit_misc:protocol_error( - precondition_failed, "channel is not transactional", []); - -handle_method(#'tx.rollback'{}, _, State) -> - {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; - -handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId}) - when TxId =/= none -> - rabbit_misc:protocol_error( - precondition_failed, "cannot switch from tx to confirm mode", []); - handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> return_ok(State#ch{confirm_enabled = true}, NoWait, #'confirm.select_ok'{}); @@ -1252,55 +1202,17 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> precondition_failed, "unknown delivery tag ~w", [DeliveryTag]) end. -add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) -> - State#ch{tx_participants = sets:union(Participants, - sets:from_list(MoreP))}. - -ack(TxnKey, UAQ) -> - fold_per_queue( - fun (QPid, MsgIds, L) -> - ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()), - [{QPid, length(MsgIds)} | L] - end, [], UAQ). - -make_tx_id() -> rabbit_guid:guid(). - -new_tx(State) -> - State#ch{transaction_id = make_tx_id(), - tx_participants = sets:new(), - uncommitted_ack_q = queue:new()}. - -internal_commit(State = #ch{transaction_id = TxnKey, - tx_participants = Participants}) -> - case rabbit_amqqueue:commit_all(sets:to_list(Participants), - TxnKey, self()) of - ok -> ok = notify_limiter(State#ch.limiter_pid, - State#ch.uncommitted_ack_q), - new_tx(State); - {error, Errors} -> rabbit_misc:protocol_error( - internal_error, "commit failed: ~w", [Errors]) - end. +ack(UAQ) -> + fold_per_queue(fun (QPid, MsgIds, L) -> + ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), + [{QPid, length(MsgIds)} | L] + end, [], UAQ). -internal_rollback(State = #ch{transaction_id = TxnKey, - tx_participants = Participants, - uncommitted_ack_q = UAQ, - unacked_message_q = UAMQ}) -> - ?LOGDEBUG("rollback ~p~n - ~p acks uncommitted, ~p messages unacked~n", - [self(), - queue:len(UAQ), - queue:len(UAMQ)]), - ok = rabbit_amqqueue:rollback_all(sets:to_list(Participants), - TxnKey, self()), - NewUAMQ = queue:join(UAQ, UAMQ), - new_tx(State#ch{unacked_message_q = NewUAMQ}). - -rollback_and_notify(State = #ch{state = closing}) -> +notify_queues(State = #ch{state = closing}) -> {ok, State}; -rollback_and_notify(State = #ch{transaction_id = none}) -> - {notify_queues(State), State#ch{state = closing}}; -rollback_and_notify(State) -> - State1 = internal_rollback(State), - {notify_queues(State1), State1#ch{state = closing}}. +notify_queues(State = #ch{consumer_mapping = Consumers}) -> + {rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()), + State#ch{state = closing}}. fold_per_queue(F, Acc0, UAQ) -> D = rabbit_misc:queue_fold( @@ -1319,9 +1231,6 @@ start_limiter(State = #ch{unacked_message_q = UAMQ, start_limiter_fun = SLF}) -> ok = limit_queues(LPid, State), LPid. -notify_queues(#ch{consumer_mapping = Consumers}) -> - rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()). - unlimit_queues(State) -> ok = limit_queues(undefined, State), undefined. @@ -1436,17 +1345,13 @@ i(connection, #ch{conn_pid = ConnPid}) -> ConnPid; i(number, #ch{channel = Channel}) -> Channel; i(user, #ch{user = User}) -> User#user.username; i(vhost, #ch{virtual_host = VHost}) -> VHost; -i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) -> gb_trees:size(UMQ); -i(messages_unacknowledged, #ch{unacked_message_q = UAMQ, - uncommitted_ack_q = UAQ}) -> - queue:len(UAMQ) + queue:len(UAQ); -i(acks_uncommitted, #ch{uncommitted_ack_q = UAQ}) -> - queue:len(UAQ); +i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> + queue:len(UAMQ); i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> rabbit_limiter:get_limit(LimiterPid); i(client_flow_blocked, #ch{limiter_pid = LimiterPid}) -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 9eef384a..6eb1aaba 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -301,7 +301,7 @@ action(list_connections, Node, Args, _Opts, Inform) -> action(list_channels, Node, Args, _Opts, Inform) -> Inform("Listing channels", []), - ArgAtoms = default_if_empty(Args, [pid, user, transactional, consumer_count, + ArgAtoms = default_if_empty(Args, [pid, user, consumer_count, messages_unacknowledged]), display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]), ArgAtoms); diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 3fb0817a..93aad9e3 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -71,7 +71,7 @@ publish1(RoutingKey, Format, Data, LogExch) -> %% second resolution, not millisecond. Timestamp = rabbit_misc:now_ms() div 1000, {ok, _RoutingRes, _DeliveredQPids} = - rabbit_basic:publish(LogExch, RoutingKey, false, false, none, + rabbit_basic:publish(LogExch, RoutingKey, false, false, #'P_basic'{content_type = <<"text/plain">>, timestamp = Timestamp}, list_to_binary(io_lib:format(Format, Data))), diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 2727c1d0..4906937b 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -70,7 +70,7 @@ %% group. Because the master is the bq of amqqueue_process, it doesn't %% have sole control over its mailbox, and as a result, the master %% itself cannot be passed messages directly (well, it could by via -%% the amqqueue:run_backing_queue_async callback but that would induce +%% the amqqueue:run_backing_queue callback but that would induce %% additional unnecessary loading on the master queue process), yet it %% needs to react to gm events, such as the death of slaves. Thus the %% master creates the coordinator, and it is the coordinator that is @@ -254,45 +254,6 @@ %% sender_death message. The slave will then be able to tidy up its %% state as normal. %% -%% We don't support transactions on mirror queues. To do so is -%% challenging. The underlying bq is free to add the contents of the -%% txn to the queue proper at any point after the tx.commit comes in -%% but before the tx.commit-ok goes out. This means that it is not -%% safe for all mirrors to simply issue the bq:tx_commit at the same -%% time, as the addition of the txn's contents to the queue may -%% subsequently be inconsistently interwoven with other actions on the -%% bq. The solution to this is, in the master, wrap the PostCommitFun -%% and do the gm:broadcast in there: at that point, you're in the bq -%% (well, there's actually nothing to stop that function being invoked -%% by some other process, but let's pretend for now: you could always -%% use run_backing_queue to ensure you really are in the queue process -%% (the _async variant would be unsafe from an ordering pov)), the -%% gm:broadcast is safe because you don't have to worry about races -%% with other gm:broadcast calls (same process). Thus this signal -%% would indicate sufficiently to all the slaves that they must insert -%% the complete contents of the txn at precisely this point in the -%% stream of events. -%% -%% However, it's quite difficult for the slaves to make that happen: -%% they would be forced to issue the bq:tx_commit at that point, but -%% then stall processing any further instructions from gm until they -%% receive the notification from their bq that the tx_commit has fully -%% completed (i.e. they need to treat what is an async system as being -%% fully synchronous). This is not too bad (apart from the -%% vomit-inducing notion of it all): just need a queue of instructions -%% from the GM; but then it gets rather worse when you consider what -%% needs to happen if the master dies at this point and the slave in -%% the middle of this tx_commit needs to be promoted. -%% -%% Finally, we can't possibly hope to make transactions atomic across -%% mirror queues, and it's not even clear that that's desirable: if a -%% slave fails whilst there's an open transaction in progress then -%% when the channel comes to commit the txn, it will detect the -%% failure and destroy the channel. However, the txn will have -%% actually committed successfully in all the other mirrors (including -%% master). To do this bit properly would require 2PC and all the -%% baggage that goes with that. -%% %% Recovery of mirrored queues is straightforward: as nodes die, the %% remaining nodes record this, and eventually a situation is reached %% in which only one node is alive, which is the master. This is the diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 463b8cfb..9e0ffb13 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -16,13 +16,12 @@ -module(rabbit_mirror_queue_master). --export([init/4, terminate/2, delete_and_terminate/2, +-export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, - tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/3, discard/3]). + status/1, invoke/3, is_duplicate/2, discard/3]). -export([start/1, stop/0]). @@ -62,7 +61,7 @@ stop() -> sender_death_fun() -> Self = self(), fun (DeadPid) -> - rabbit_amqqueue:run_backing_queue_async( + rabbit_amqqueue:run_backing_queue( Self, ?MODULE, fun (?MODULE, State = #state { gm = GM, known_senders = KS }) -> ok = gm:broadcast(GM, {sender_death, DeadPid}), @@ -72,7 +71,7 @@ sender_death_fun() -> end. init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, - AsyncCallback, SyncCallback) -> + AsyncCallback) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q, undefined, sender_death_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), @@ -84,7 +83,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, end) -- [node()], [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), - BQS = BQ:init(Q, Recover, AsyncCallback, SyncCallback), + BQS = BQ:init(Q, Recover, AsyncCallback), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -243,21 +242,6 @@ ack(AckTags, State = #state { gm = GM, {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. -tx_publish(_Txn, _Msg, _MsgProps, _ChPid, State) -> - %% We don't support txns in mirror queues - State. - -tx_ack(_Txn, _AckTags, State) -> - %% We don't support txns in mirror queues - State. - -tx_rollback(_Txn, State) -> - {[], State}. - -tx_commit(_Txn, PostCommitFun, _MsgPropsFun, State) -> - PostCommitFun(), %% Probably must run it to avoid deadlocks - {[], State}. - requeue(AckTags, MsgPropsFun, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> @@ -299,7 +283,7 @@ invoke(Mod, Fun, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. -is_duplicate(none, Message = #basic_message { id = MsgId }, +is_duplicate(Message = #basic_message { id = MsgId }, State = #state { seen_status = SS, backing_queue = BQ, backing_queue_state = BQS, @@ -341,11 +325,7 @@ is_duplicate(none, Message = #basic_message { id = MsgId }, %% Don't erase from SS here because discard/2 is about to %% be called and we need to be able to detect this case {discarded, State} - end; -is_duplicate(_Txn, _Msg, State) -> - %% In a transaction. We don't support txns in mirror queues. But - %% it's probably not a duplicate... - {false, State}. + end. discard(Msg = #basic_message { id = MsgId }, ChPid, State = #state { gm = GM, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 55d61d41..93340ba8 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -167,14 +167,7 @@ handle_call({gm_deaths, Deaths}, From, {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State} - end; - -handle_call({run_backing_queue, Mod, Fun}, _From, State) -> - reply(ok, run_backing_queue(Mod, Fun, State)); - -handle_call({commit, _Txn, _ChPid}, _From, State) -> - %% We don't support transactions in mirror queues - reply(ok, State). + end. handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -208,11 +201,7 @@ handle_cast(update_ram_duration, handle_cast(sync_timeout, State) -> noreply(backing_queue_timeout( - State #state { sync_timer_ref = undefined })); - -handle_cast({rollback, _Txn, _ChPid}, State) -> - %% We don't support transactions in mirror queues - noreply(State). + State #state { sync_timer_ref = undefined })). handle_info(timeout, State) -> noreply(backing_queue_timeout(State)); @@ -271,7 +260,6 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, prioritise_call(Msg, _From, _State) -> case Msg of - {run_backing_queue, _Mod, _Fun} -> 6; {gm_deaths, _Deaths} -> 5; _ -> 0 end. @@ -331,14 +319,7 @@ bq_init(BQ, Q, Recover) -> Self = self(), BQ:init(Q, Recover, fun (Mod, Fun) -> - rabbit_amqqueue:run_backing_queue_async(Self, Mod, Fun) - end, - fun (Mod, Fun) -> - rabbit_misc:with_exit_handler( - fun () -> error end, - fun () -> - rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) - end) + rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) end). run_backing_queue(rabbit_mirror_queue_master, Fun, State) -> @@ -488,7 +469,7 @@ promote_me(From, #state { q = Q, %% %% Everything that's in MA gets requeued. Consequently the new %% master should start with a fresh AM as there are no messages - %% pending acks (txns will have been rolled back). + %% pending acks. MSList = dict:to_list(MS), SS = dict:from_list( @@ -605,15 +586,14 @@ confirm_sender_death(Pid) -> %% Note that we do not remove our knowledge of this ChPid until we %% get the sender_death from GM. {ok, _TRef} = timer:apply_after( - ?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue_async, + ?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue, [self(), rabbit_mirror_queue_master, Fun]), ok. maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, msg_seq_no = MsgSeqNo, - sender = ChPid, - txn = none }, + sender = ChPid }, EnqueueOnPromotion, State = #state { sender_queues = SQ, msg_id_status = MS }) -> State1 = ensure_monitoring(ChPid, State), @@ -655,10 +635,7 @@ maybe_enqueue_message( SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { msg_id_status = dict:erase(MsgId, MS), sender_queues = SQ1 } - end; -maybe_enqueue_message(_Delivery, _EnqueueOnPromotion, State) -> - %% We don't support txns in mirror queues. - State. + end. get_sender_queue(ChPid, SQ) -> case dict:find(ChPid, SQ) of diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index aaf3df78..bf89cdb2 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -76,11 +76,10 @@ %% the segment file combined with the journal, no writing needs to be %% done to the segment file either (in fact it is deleted if it exists %% at all). This is safe given that the set of acks is a subset of the -%% set of publishes. When it's necessary to sync messages because of -%% transactions, it's only necessary to fsync on the journal: when -%% entries are distributed from the journal to segment files, those -%% segments appended to are fsync'd prior to the journal being -%% truncated. +%% set of publishes. When it is necessary to sync messages, it is +%% sufficient to fsync on the journal: when entries are distributed +%% from the journal to segment files, those segments appended to are +%% fsync'd prior to the journal being truncated. %% %% This module is also responsible for scanning the queue index files %% and seeding the message store on start up. @@ -289,14 +288,13 @@ sync(State = #qistate { unsynced_msg_ids = MsgIds }) -> sync_if([] =/= MsgIds, State). sync(SeqIds, State) -> - %% The SeqIds here contains the SeqId of every publish and ack in - %% the transaction. Ideally we should go through these seqids and - %% only sync the journal if the pubs or acks appear in the + %% The SeqIds here contains the SeqId of every publish and ack to + %% be sync'ed. Ideally we should go through these seqids and only + %% sync the journal if the pubs or acks appear in the %% journal. However, this would be complex to do, and given that %% the variable queue publishes and acks to the qi, and then %% syncs, all in one operation, there is no possibility of the - %% seqids not being in the journal, provided the transaction isn't - %% emptied (handled by sync_if anyway). + %% seqids not being in the journal. sync_if([] =/= SeqIds, State). flush(State = #qistate { dirty_count = 0 }) -> State; diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3ee71a6d..63676fef 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -705,7 +705,6 @@ test_topic_expect_match(X, List) -> Res = rabbit_exchange_type_topic:route( X, #delivery{mandatory = false, immediate = false, - txn = none, sender = self(), message = Message}), ExpectedRes = lists:map( @@ -2084,7 +2083,7 @@ test_queue_index() -> variable_queue_init(Q, Recover) -> rabbit_variable_queue:init( - Q, Recover, fun nop/2, fun nop/2, fun nop/2, fun nop/1). + Q, Recover, fun nop/2, fun nop/2, fun nop/1). variable_queue_publish(IsPersistent, Count, VQ) -> variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). @@ -2132,6 +2131,29 @@ with_fresh_variable_queue(Fun) -> _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)), passed. +publish_and_confirm(QPid, Payload, Count) -> + Seqs = lists:seq(1, Count), + [begin + Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{delivery_mode = 2}, + Payload), + Delivery = #delivery{mandatory = false, immediate = false, + sender = self(), message = Msg, msg_seq_no = Seq}, + true = rabbit_amqqueue:deliver(QPid, Delivery) + end || Seq <- Seqs], + wait_for_confirms(gb_sets:from_list(Seqs)). + +wait_for_confirms(Unconfirmed) -> + case gb_sets:is_empty(Unconfirmed) of + true -> ok; + false -> receive {'$gen_cast', {confirm, Confirmed, _}} -> + wait_for_confirms( + gb_sets:difference(Unconfirmed, + gb_sets:from_list(Confirmed))) + after 1000 -> exit(timeout_waiting_for_confirm) + end + end. + test_variable_queue() -> [passed = with_fresh_variable_queue(F) || F <- [fun test_variable_queue_dynamic_duration_change/1, @@ -2325,17 +2347,10 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> test_queue_recover() -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), - TxID = rabbit_guid:guid(), {new, #amqqueue { pid = QPid, name = QName } = Q} = rabbit_amqqueue:declare(test_queue(), true, false, [], none), - [begin - Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{delivery_mode = 2}, <<>>), - Delivery = #delivery{mandatory = false, immediate = false, txn = TxID, - sender = self(), message = Msg}, - true = rabbit_amqqueue:deliver(QPid, Delivery) - end || _ <- lists:seq(1, Count)], - rabbit_amqqueue:commit_all([QPid], TxID, self()), + publish_and_confirm(QPid, <<>>, Count), + exit(QPid, kill), MRef = erlang:monitor(process, QPid), receive {'DOWN', MRef, process, QPid, _Info} -> ok @@ -2362,18 +2377,10 @@ test_variable_queue_delete_msg_store_files_callback() -> ok = restart_msg_store_empty(), {new, #amqqueue { pid = QPid, name = QName } = Q} = rabbit_amqqueue:declare(test_queue(), true, false, [], none), - TxID = rabbit_guid:guid(), Payload = <<0:8388608>>, %% 1MB Count = 30, - [begin - Msg = rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{delivery_mode = 2}, Payload), - Delivery = #delivery{mandatory = false, immediate = false, txn = TxID, - sender = self(), message = Msg}, - true = rabbit_amqqueue:deliver(QPid, Delivery) - end || _ <- lists:seq(1, Count)], - rabbit_amqqueue:commit_all([QPid], TxID, self()), + publish_and_confirm(QPid, Payload, Count), + rabbit_amqqueue:set_ram_duration_target(QPid, 0), CountMinusOne = Count - 1, diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 03b2c9e8..2db960ac 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -20,7 +20,7 @@ -ifdef(use_specs). --export_type([txn/0, maybe/1, info/0, infos/0, info_key/0, info_keys/0, +-export_type([maybe/1, info/0, infos/0, info_key/0, info_keys/0, message/0, msg_id/0, basic_message/0, delivery/0, content/0, decoded_content/0, undecoded_content/0, unencoded_content/0, encoded_content/0, message_properties/0, @@ -73,16 +73,12 @@ -type(delivery() :: #delivery{mandatory :: boolean(), immediate :: boolean(), - txn :: maybe(txn()), sender :: pid(), message :: message()}). -type(message_properties() :: #message_properties{expiry :: pos_integer() | 'undefined', needs_confirming :: boolean()}). -%% this is really an abstract type, but dialyzer does not support them --type(txn() :: rabbit_guid:guid()). - -type(info_key() :: atom()). -type(info_keys() :: [info_key()]). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c6d99deb..ea72de66 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,20 +16,18 @@ -module(rabbit_variable_queue). --export([init/4, terminate/2, delete_and_terminate/2, +-export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, - dropwhile/2, fetch/2, ack/2, - tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, - requeue/3, len/1, is_empty/1, + dropwhile/2, fetch/2, ack/2, requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/3, discard/3, + status/1, invoke/3, is_duplicate/2, discard/3, multiple_routing_keys/0]). -export([start/1, stop/0]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/6]). +-export([start_msg_store/2, stop_msg_store/0, init/5]). %%---------------------------------------------------------------------------- %% Definitions: @@ -239,12 +237,10 @@ ram_ack_index, index_state, msg_store_clients, - on_sync, durable, transient_threshold, async_callback, - sync_callback, len, persistent_count, @@ -285,10 +281,6 @@ end_seq_id %% end_seq_id is exclusive }). --record(tx, { pending_messages, pending_acks }). - --record(sync, { acks_persistent, acks_all, pubs, funs }). - %% When we discover, on publish, that we should write some indices to %% disk for some betas, the IO_BATCH_SIZE sets the number of betas %% that we must be due to write indices for before we do any work at @@ -321,12 +313,6 @@ count :: non_neg_integer(), end_seq_id :: non_neg_integer() }). --type(sync() :: #sync { acks_persistent :: [[seq_id()]], - acks_all :: [[seq_id()]], - pubs :: [{message_properties_transformer(), - [rabbit_types:basic_message()]}], - funs :: [fun (() -> any())] }). - -type(state() :: #vqstate { q1 :: queue(), q2 :: bpqueue:bpqueue(), @@ -339,12 +325,10 @@ index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, - on_sync :: sync(), durable :: boolean(), transient_threshold :: non_neg_integer(), async_callback :: async_callback(), - sync_callback :: sync_callback(), len :: non_neg_integer(), persistent_count :: non_neg_integer(), @@ -377,11 +361,6 @@ count = 0, end_seq_id = Z }). --define(BLANK_SYNC, #sync { acks_persistent = [], - acks_all = [], - pubs = [], - funs = [] }). - %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -410,17 +389,17 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(Queue, Recover, AsyncCallback, SyncCallback) -> - init(Queue, Recover, AsyncCallback, SyncCallback, +init(Queue, Recover, AsyncCallback) -> + init(Queue, Recover, AsyncCallback, fun (MsgIds, ActionTaken) -> msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken) end, fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end). init(#amqqueue { name = QueueName, durable = IsDurable }, false, - AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> + AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), - init(IsDurable, IndexState, 0, [], AsyncCallback, SyncCallback, + init(IsDurable, IndexState, 0, [], AsyncCallback, case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, MsgOnDiskFun, AsyncCallback); @@ -429,7 +408,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, false, msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); init(#amqqueue { name = QueueName, durable = true }, true, - AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> + AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, TRef, Terms1} = case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of @@ -450,14 +429,14 @@ init(#amqqueue { name = QueueName, durable = true }, true, rabbit_msg_store:contains(MsgId, PersistentClient) end, MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback, + init(true, IndexState, DeltaCount, Terms1, AsyncCallback, PersistentClient, TransientClient). terminate(_Reason, State) -> State1 = #vqstate { persistent_count = PCount, index_state = IndexState, msg_store_clients = {MSCStateP, MSCStateT} } = - remove_pending_ack(true, tx_commit_index(State)), + remove_pending_ack(true, State), PRef = case MSCStateP of undefined -> undefined; _ -> ok = rabbit_msg_store:client_terminate(MSCStateP), @@ -590,59 +569,6 @@ ack(AckTags, State) -> AckTags, State), {MsgIds, a(State1)}. -tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, - _ChPid, State = #vqstate { durable = IsDurable, - msg_store_clients = MSCState }) -> - Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), - case IsPersistent andalso IsDurable of - true -> MsgStatus = msg_status(true, undefined, Msg, MsgProps), - #msg_status { msg_on_disk = true } = - maybe_write_msg_to_disk(false, MsgStatus, MSCState); - false -> ok - end, - a(State). - -tx_ack(Txn, AckTags, State) -> - Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), - State. - -tx_rollback(Txn, State = #vqstate { durable = IsDurable, - msg_store_clients = MSCState }) -> - #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), - erase_tx(Txn), - ok = case IsDurable of - true -> msg_store_remove(MSCState, true, - persistent_msg_ids(Pubs)); - false -> ok - end, - {lists:append(AckTags), a(State)}. - -tx_commit(Txn, Fun, MsgPropsFun, - State = #vqstate { durable = IsDurable, - async_callback = AsyncCallback, - sync_callback = SyncCallback, - msg_store_clients = MSCState }) -> - #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), - erase_tx(Txn), - AckTags1 = lists:append(AckTags), - PersistentMsgIds = persistent_msg_ids(Pubs), - HasPersistentPubs = PersistentMsgIds =/= [], - {AckTags1, - a(case IsDurable andalso HasPersistentPubs of - true -> MsgStoreCallback = - fun () -> msg_store_callback( - PersistentMsgIds, Pubs, AckTags1, Fun, - MsgPropsFun, AsyncCallback, SyncCallback) - end, - ok = msg_store_sync(MSCState, true, PersistentMsgIds, - fun () -> spawn(MsgStoreCallback) end), - State; - false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1, - Fun, MsgPropsFun, State) - end)}. - requeue(AckTags, MsgPropsFun, State) -> MsgPropsFun1 = fun (MsgProps) -> (MsgPropsFun(MsgProps)) #message_properties { @@ -748,23 +674,22 @@ ram_duration(State = #vqstate { ram_msg_count_prev = RamMsgCount, ram_ack_count_prev = RamAckCount }}. -needs_timeout(State = #vqstate { on_sync = OnSync }) -> - case {OnSync, needs_index_sync(State)} of - {?BLANK_SYNC, false} -> - case reduce_memory_use(fun (_Quota, State1) -> {0, State1} end, - fun (_Quota, State1) -> State1 end, - fun (State1) -> State1 end, - fun (_Quota, State1) -> {0, State1} end, - State) of - {true, _State} -> idle; - {false, _State} -> false - end; - _ -> - timed +needs_timeout(State) -> + case needs_index_sync(State) of + false -> case reduce_memory_use( + fun (_Quota, State1) -> {0, State1} end, + fun (_Quota, State1) -> State1 end, + fun (State1) -> State1 end, + fun (_Quota, State1) -> {0, State1} end, + State) of + {true, _State} -> idle; + {false, _State} -> false + end; + true -> timed end. timeout(State) -> - a(reduce_memory_use(confirm_commit_index(tx_commit_index(State)))). + a(reduce_memory_use(confirm_commit_index(State))). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. @@ -774,7 +699,6 @@ status(#vqstate { len = Len, pending_ack = PA, ram_ack_index = RAI, - on_sync = #sync { funs = From }, target_ram_count = TargetRamCount, ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, @@ -791,7 +715,6 @@ status(#vqstate { {q4 , queue:len(Q4)}, {len , Len}, {pending_acks , dict:size(PA)}, - {outstanding_txns , length(From)}, {target_ram_count , TargetRamCount}, {ram_msg_count , RamMsgCount}, {ram_ack_count , gb_trees:size(RAI)}, @@ -803,10 +726,9 @@ status(#vqstate { {avg_ack_ingress_rate, AvgAckIngressRate}, {avg_ack_egress_rate , AvgAckEgressRate} ]. -invoke(?MODULE, Fun, State) -> - Fun(?MODULE, State). +invoke(?MODULE, Fun, State) -> Fun(?MODULE, State). -is_duplicate(_Txn, _Msg, State) -> {false, State}. +is_duplicate(_Msg, State) -> {false, State}. discard(_Msg, _ChPid, State) -> State. @@ -902,11 +824,6 @@ msg_store_remove(MSCState, IsPersistent, MsgIds) -> MSCState, IsPersistent, fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end). -msg_store_sync(MSCState, IsPersistent, MsgIds, Fun) -> - with_immutable_msg_store_state( - MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:sync(MsgIds, Fun, MSCState1) end). - msg_store_close_fds(MSCState, IsPersistent) -> with_msg_store_state( MSCState, IsPersistent, @@ -923,20 +840,6 @@ maybe_write_delivered(false, _SeqId, IndexState) -> maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). -lookup_tx(Txn) -> case get({txn, Txn}) of - undefined -> #tx { pending_messages = [], - pending_acks = [] }; - V -> V - end. - -store_tx(Txn, Tx) -> put({txn, Txn}, Tx). - -erase_tx(Txn) -> erase({txn, Txn}). - -persistent_msg_ids(Pubs) -> - [MsgId || {#basic_message { id = MsgId, - is_persistent = true }, _MsgProps} <- Pubs]. - betas_from_index_entries(List, TransientThreshold, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( @@ -1000,8 +903,8 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -init(IsDurable, IndexState, DeltaCount, Terms, - AsyncCallback, SyncCallback, PersistentClient, TransientClient) -> +init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, + PersistentClient, TransientClient) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), @@ -1023,12 +926,10 @@ init(IsDurable, IndexState, DeltaCount, Terms, ram_ack_index = gb_trees:empty(), index_state = IndexState1, msg_store_clients = {PersistentClient, TransientClient}, - on_sync = ?BLANK_SYNC, durable = IsDurable, transient_threshold = NextSeqId, async_callback = AsyncCallback, - sync_callback = SyncCallback, len = DeltaCount1, persistent_count = DeltaCount1, @@ -1146,88 +1047,6 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { len = Len1, persistent_count = PCount1 }}. -msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun, - AsyncCallback, SyncCallback) -> - case SyncCallback(?MODULE, - fun (?MODULE, StateN) -> - tx_commit_post_msg_store(true, Pubs, AckTags, - Fun, MsgPropsFun, StateN) - end) of - ok -> ok; - error -> remove_persistent_messages(PersistentMsgIds, AsyncCallback) - end. - -remove_persistent_messages(MsgIds, AsyncCallback) -> - PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, - undefined, AsyncCallback), - ok = rabbit_msg_store:remove(MsgIds, PersistentClient), - rabbit_msg_store:client_delete_and_terminate(PersistentClient). - -tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun, - State = #vqstate { - on_sync = OnSync = #sync { - acks_persistent = SPAcks, - acks_all = SAcks, - pubs = SPubs, - funs = SFuns }, - pending_ack = PA, - durable = IsDurable }) -> - PersistentAcks = - case IsDurable of - true -> [AckTag || AckTag <- AckTags, - case dict:fetch(AckTag, PA) of - #msg_status {} -> - false; - {IsPersistent, _MsgId, _MsgProps} -> - IsPersistent - end]; - false -> [] - end, - case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of - true -> State #vqstate { - on_sync = #sync { - acks_persistent = [PersistentAcks | SPAcks], - acks_all = [AckTags | SAcks], - pubs = [{MsgPropsFun, Pubs} | SPubs], - funs = [Fun | SFuns] }}; - false -> State1 = tx_commit_index( - State #vqstate { - on_sync = #sync { - acks_persistent = [], - acks_all = [AckTags], - pubs = [{MsgPropsFun, Pubs}], - funs = [Fun] } }), - State1 #vqstate { on_sync = OnSync } - end. - -tx_commit_index(State = #vqstate { on_sync = ?BLANK_SYNC }) -> - State; -tx_commit_index(State = #vqstate { on_sync = #sync { - acks_persistent = SPAcks, - acks_all = SAcks, - pubs = SPubs, - funs = SFuns }, - durable = IsDurable }) -> - PAcks = lists:append(SPAcks), - Acks = lists:append(SAcks), - Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), - {Msg, MsgProps} <- lists:reverse(PubsN)], - {_MsgIds, State1} = ack(Acks, State), - {SeqIds, State2 = #vqstate { index_state = IndexState }} = - lists:foldl( - fun ({Msg = #basic_message { is_persistent = IsPersistent }, - MsgProps}, - {SeqIdsAcc, State3}) -> - IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State4} = - publish(Msg, MsgProps, false, IsPersistent1, State3), - {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State4} - end, {PAcks, State1}, Pubs), - IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), - [ Fun() || Fun <- lists:reverse(SFuns) ], - reduce_memory_use( - State2 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). - purge_betas_and_deltas(LensByStore, State = #vqstate { q3 = Q3, index_state = IndexState, |