From 39f46d22ed36b864a38cafe9b8f18ce5bec313d8 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 3 May 2010 21:20:14 +0100 Subject: pluggable queue backends - behaviour and specs for pluggable queue backends - reworking of the queue code to support pluggability, including hookups to memory monitoring and file handle management - interface to existing persister via the new API All these changes were cherry-picked from the bug21673 branch. --- ebin/rabbit_app.in | 1 + include/rabbit_backing_queue_spec.hrl | 63 ++++ src/rabbit_amqqueue.erl | 20 +- src/rabbit_amqqueue_process.erl | 582 +++++++++++++++++----------------- src/rabbit_backing_queue.erl | 133 ++++++++ src/rabbit_invariable_queue.erl | 264 +++++++++++++++ 6 files changed, 772 insertions(+), 291 deletions(-) create mode 100644 include/rabbit_backing_queue_spec.hrl create mode 100644 src/rabbit_backing_queue.erl create mode 100644 src/rabbit_invariable_queue.erl diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index ad8e3549..bdf407eb 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -18,6 +18,7 @@ {ssl_listeners, []}, {ssl_options, []}, {vm_memory_high_watermark, 0.4}, + {backing_queue_module, rabbit_invariable_queue}, {persister_max_wrap_entries, 500}, {persister_hibernate_after, 10000}, {default_user, <<"guest">>}, diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl new file mode 100644 index 00000000..1b536dfa --- /dev/null +++ b/include/rabbit_backing_queue_spec.hrl @@ -0,0 +1,63 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-type(fetch_result() :: + %% Message, IsDelivered, AckTag, Remaining_Len + ('empty'|{basic_message(), boolean(), ack(), non_neg_integer()})). +-type(is_durable() :: boolean()). +-type(attempt_recovery() :: boolean()). +-type(purged_msg_count() :: non_neg_integer()). +-type(ack_required() :: boolean()). + +-spec(start/1 :: ([queue_name()]) -> 'ok'). +-spec(init/3 :: (queue_name(), is_durable(), attempt_recovery()) -> state()). +-spec(terminate/1 :: (state()) -> state()). +-spec(delete_and_terminate/1 :: (state()) -> state()). +-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). +-spec(publish/2 :: (basic_message(), state()) -> state()). +-spec(publish_delivered/3 :: + (ack_required(), basic_message(), state()) -> {ack(), state()}). +-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). +-spec(ack/2 :: ([ack()], state()) -> state()). +-spec(tx_publish/3 :: (txn(), basic_message(), state()) -> state()). +-spec(tx_ack/3 :: (txn(), [ack()], state()) -> state()). +-spec(tx_rollback/2 :: (txn(), state()) -> {[ack()], state()}). +-spec(tx_commit/3 :: (txn(), fun (() -> any()), state()) -> {[ack()], state()}). +-spec(requeue/2 :: ([ack()], state()) -> state()). +-spec(len/1 :: (state()) -> non_neg_integer()). +-spec(is_empty/1 :: (state()) -> boolean()). +-spec(set_ram_duration_target/2 :: + (('undefined' | 'infinity' | number()), state()) -> state()). +-spec(ram_duration/1 :: (state()) -> {number(), state()}). +-spec(needs_sync/1 :: (state()) -> boolean()). +-spec(sync/1 :: (state()) -> state()). +-spec(handle_pre_hibernate/1 :: (state()) -> state()). +-spec(status/1 :: (state()) -> [{atom(), any()}]). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b86cdd04..2d75b15b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -33,6 +33,8 @@ -export([start/0, declare/4, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1, + maybe_run_queue_via_backing_queue/2, + update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, @@ -109,6 +111,9 @@ -spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). +-spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> A))) -> 'ok'). +-spec(update_ram_duration/1 :: (pid()) -> 'ok'). +-spec(set_ram_duration_target/2 :: (pid(), number()) -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -119,9 +124,8 @@ start() -> DurableQueues = find_durable_queues(), - ok = rabbit_sup:start_child( - rabbit_persister, - [[QName || #amqqueue{name = QName} <- DurableQueues]]), + {ok, BQ} = application:get_env(backing_queue_module), + ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]), {ok,_} = supervisor:start_child( rabbit_sup, {rabbit_amqqueue_sup, @@ -352,6 +356,16 @@ internal_delete(QueueName) -> ok end. +maybe_run_queue_via_backing_queue(QPid, Fun) -> + gen_server2:pcall(QPid, 7, {maybe_run_queue_via_backing_queue, Fun}, + infinity). + +update_ram_duration(QPid) -> + gen_server2:pcast(QPid, 8, update_ram_duration). + +set_ram_duration_target(QPid, Duration) -> + gen_server2:pcast(QPid, 8, {set_ram_duration_target, Duration}). + set_maximum_since_use(QPid, Age) -> gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d745a69c..06712e9c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -35,11 +35,14 @@ -behaviour(gen_server2). --define(UNSENT_MESSAGE_LIMIT, 100). +-define(UNSENT_MESSAGE_LIMIT, 100). +-define(SYNC_INTERVAL, 5). %% milliseconds +-define(RAM_DURATION_UPDATE_INTERVAL, 5000). -export([start_link/1, info_keys/0]). --export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, + handle_info/2, handle_pre_hibernate/1]). -import(queue). -import(erlang). @@ -50,21 +53,22 @@ owner, exclusive_consumer, has_had_consumers, - next_msg_id, - message_buffer, + backing_queue, + backing_queue_state, active_consumers, - blocked_consumers}). + blocked_consumers, + sync_timer_ref, + rate_timer_ref + }). -record(consumer, {tag, ack_required}). --record(tx, {is_persistent, pending_messages, pending_acks}). - %% These are held in our process dictionary -record(cr, {consumer_count, ch_pid, limiter_pid, monitor_ref, - unacked_messages, + acktags, is_limit_active, txn, unsent_message_count}). @@ -82,7 +86,9 @@ messages_unacknowledged, messages, consumers, - memory]). + memory, + backing_queue_status + ]). %%---------------------------------------------------------------------------- @@ -94,34 +100,108 @@ info_keys() -> ?INFO_KEYS. init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), + process_flag(trap_exit, true), + {ok, BQ} = application:get_env(backing_queue_module), + {ok, #q{q = Q, owner = none, exclusive_consumer = none, has_had_consumers = false, - next_msg_id = 1, - message_buffer = undefined, + backing_queue = BQ, + backing_queue_state = undefined, active_consumers = queue:new(), - blocked_consumers = queue:new()}, hibernate, + blocked_consumers = queue:new(), + sync_timer_ref = undefined, + rate_timer_ref = undefined}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -terminate(_Reason, #q{message_buffer = undefined}) -> - ok; -terminate(_Reason, State) -> +terminate(shutdown, State = #q{backing_queue = BQ}) -> + terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); +terminate({shutdown, _}, State = #q{backing_queue = BQ}) -> + terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); +terminate(_Reason, State = #q{backing_queue = BQ}) -> %% FIXME: How do we cancel active subscriptions? - QName = qname(State), - lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end, - all_tx()), - ok = purge_message_buffer(QName, State#q.message_buffer), - ok = rabbit_amqqueue:internal_delete(QName). + State1 = terminate_shutdown(fun (BQS) -> BQ:delete_and_terminate(BQS) end, + State), + ok = rabbit_amqqueue:internal_delete(qname(State1)). code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------------------------------------- -reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}. +terminate_shutdown(Fun, State) -> + State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = + stop_sync_timer(stop_rate_timer(State)), + case BQS of + undefined -> State; + _ -> ok = rabbit_memory_monitor:deregister(self()), + BQS1 = lists:foldl( + fun (#cr{txn = none}, BQSN) -> + BQSN; + (#cr{txn = Txn}, BQSN) -> + {_AckTags, BQSN1} = + BQ:tx_rollback(Txn, BQSN), + BQSN1 + end, BQS, all_ch_record()), + State1#q{backing_queue_state = Fun(BQS1)} + end. -noreply(NewState) -> {noreply, NewState, hibernate}. +reply(Reply, NewState) -> + assert_invariant(NewState), + {NewState1, Timeout} = next_state(NewState), + {reply, Reply, NewState1, Timeout}. + +noreply(NewState) -> + assert_invariant(NewState), + {NewState1, Timeout} = next_state(NewState), + {noreply, NewState1, Timeout}. + +next_state(State) -> + State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = + ensure_rate_timer(State), + case BQ:needs_sync(BQS)of + true -> {ensure_sync_timer(State1), 0}; + false -> {stop_sync_timer(State1), hibernate} + end. + +ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) -> + {ok, TRef} = timer:apply_after( + ?SYNC_INTERVAL, + rabbit_amqqueue, maybe_run_queue_via_backing_queue, + [self(), fun (BQS) -> BQ:sync(BQS) end]), + State#q{sync_timer_ref = TRef}; +ensure_sync_timer(State) -> + State. + +stop_sync_timer(State = #q{sync_timer_ref = undefined}) -> + State; +stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + State#q{sync_timer_ref = undefined}. + +ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> + {ok, TRef} = timer:apply_after( + ?RAM_DURATION_UPDATE_INTERVAL, + rabbit_amqqueue, update_ram_duration, + [self()]), + State#q{rate_timer_ref = TRef}; +ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) -> + State#q{rate_timer_ref = undefined}; +ensure_rate_timer(State) -> + State. + +stop_rate_timer(State = #q{rate_timer_ref = undefined}) -> + State; +stop_rate_timer(State = #q{rate_timer_ref = just_measured}) -> + State#q{rate_timer_ref = undefined}; +stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + State#q{rate_timer_ref = undefined}. + +assert_invariant(#q{active_consumers = AC, + backing_queue = BQ, backing_queue_state = BQS}) -> + true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -137,7 +217,7 @@ ch_record(ChPid) -> C = #cr{consumer_count = 0, ch_pid = ChPid, monitor_ref = MonitorRef, - unacked_messages = dict:new(), + acktags = sets:new(), is_limit_active = false, txn = none, unsent_message_count = 0}, @@ -168,29 +248,33 @@ record_current_channel_tx(ChPid, Txn) -> %% that wasn't happening already) store_ch_record((ch_record(ChPid))#cr{txn = Txn}). -deliver_immediately(Message, IsDelivered, - State = #q{q = #amqqueue{name = QName}, - active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers, - next_msg_id = NextId}) -> +deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, + State = #q{q = #amqqueue{name = QName}, + active_consumers = ActiveConsumers, + blocked_consumers = BlockedConsumers}) -> case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, ActiveConsumersTail} -> C = #cr{limiter_pid = LimiterPid, unsent_message_count = Count, - unacked_messages = UAM} = ch_record(ChPid), - case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of + acktags = ChAckTags} = ch_record(ChPid), + IsMsgReady = PredFun(FunAcc, State), + case (IsMsgReady andalso + rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of true -> + {{Message, IsDelivered, AckTag}, FunAcc1, State1} = + DeliverFun(AckRequired, FunAcc, State), rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, IsDelivered, Message}), - NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); - false -> UAM - end, + {QName, self(), AckTag, IsDelivered, Message}), + ChAckTags1 = case AckRequired of + true -> sets:add_element( + AckTag, ChAckTags); + false -> ChAckTags + end, NewC = C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}, + acktags = ChAckTags1}, store_ch_record(NewC), {NewActiveConsumers, NewBlockedConsumers} = case ch_record_state_transition(C, NewC) of @@ -204,88 +288,85 @@ deliver_immediately(Message, IsDelivered, {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, - {offered, AckRequired, - State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers, - next_msg_id = NextId + 1}}; - false -> + State2 = State1#q{ + active_consumers = NewActiveConsumers, + blocked_consumers = NewBlockedConsumers}, + deliver_msgs_to_consumers(Funs, FunAcc1, State2); + %% if IsMsgReady then we've hit the limiter + false when IsMsgReady -> store_ch_record(C#cr{is_limit_active = true}), {NewActiveConsumers, NewBlockedConsumers} = move_consumers(ChPid, ActiveConsumers, BlockedConsumers), - deliver_immediately( - Message, IsDelivered, + deliver_msgs_to_consumers( + Funs, FunAcc, State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers}) - end; - {empty, _} -> - {not_offered, State} - end. - -run_message_queue(State = #q{message_buffer = MessageBuffer}) -> - run_message_queue(MessageBuffer, State). - -run_message_queue(MessageBuffer, State) -> - case queue:out(MessageBuffer) of - {{value, {Message, IsDelivered}}, BufferTail} -> - case deliver_immediately(Message, IsDelivered, State) of - {offered, true, NewState} -> - persist_delivery(qname(State), Message, IsDelivered), - run_message_queue(BufferTail, NewState); - {offered, false, NewState} -> - persist_auto_ack(qname(State), Message), - run_message_queue(BufferTail, NewState); - {not_offered, NewState} -> - NewState#q{message_buffer = MessageBuffer} + blocked_consumers = NewBlockedConsumers}); + false -> + %% no message was ready, so we don't need to block anyone + {FunAcc, State} end; {empty, _} -> - State#q{message_buffer = MessageBuffer} + {FunAcc, State} end. -attempt_delivery(none, _ChPid, Message, State) -> - case deliver_immediately(Message, false, State) of - {offered, false, State1} -> - {true, State1}; - {offered, true, State1} -> - persist_message(none, qname(State), Message), - persist_delivery(qname(State), Message, false), - {true, State1}; - {not_offered, State1} -> - {false, State1} - end; -attempt_delivery(Txn, ChPid, Message, State) -> - persist_message(Txn, qname(State), Message), - record_pending_message(Txn, ChPid, Message), - {true, State}. +deliver_from_queue_pred(IsEmpty, _State) -> + not IsEmpty. + +deliver_from_queue_deliver(AckRequired, false, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {{Message, IsDelivered, AckTag, Remaining}, BQS1} = + BQ:fetch(AckRequired, BQS), + {{Message, IsDelivered, AckTag}, 0 == Remaining, + State #q { backing_queue_state = BQS1 }}. + +run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + Funs = {fun deliver_from_queue_pred/2, + fun deliver_from_queue_deliver/3}, + IsEmpty = BQ:is_empty(BQS), + {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), + State1. + +attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> + PredFun = fun (IsEmpty, _State) -> not IsEmpty end, + DeliverFun = + fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> + {AckTag, BQS1} = + BQ:publish_delivered(AckRequired, Message, BQS), + {{Message, false, AckTag}, true, + State1#q{backing_queue_state = BQS1}} + end, + deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); +attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + record_current_channel_tx(ChPid, Txn), + {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. -deliver_or_enqueue(Txn, ChPid, Message, State) -> +deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> case attempt_delivery(Txn, ChPid, Message, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> - persist_message(Txn, qname(State), Message), - NewMB = queue:in({Message, false}, NewState#q.message_buffer), - {false, NewState#q{message_buffer = NewMB}} + %% Txn is none and no unblocked channels with consumers + BQS = BQ:publish(Message, State #q.backing_queue_state), + {false, NewState#q{backing_queue_state = BQS}} end. -deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) -> - run_message_queue(queue:join(MessageBuffer, queue:from_list(Messages)), - State). +requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> + maybe_run_queue_via_backing_queue( + fun (BQS) -> BQ:requeue(AckTags, BQS) end, State). add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). remove_consumer(ChPid, ConsumerTag, Queue) -> - %% TODO: replace this with queue:filter/2 once we move to R12 - queue:from_list(lists:filter( - fun ({CP, #consumer{tag = CT}}) -> - (CP /= ChPid) or (CT /= ConsumerTag) - end, queue:to_list(Queue))). + queue:filter(fun ({CP, #consumer{tag = CT}}) -> + (CP /= ChPid) or (CT /= ConsumerTag) + end, Queue). remove_consumers(ChPid, Queue) -> - %% TODO: replace this with queue:filter/2 once we move to R12 - queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end, - queue:to_list(Queue))). + queue:filter(fun ({CP, _}) -> CP /= ChPid end, Queue). move_consumers(ChPid, From, To) -> {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end, @@ -320,7 +401,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> not_found -> {ok, State}; #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn, - unacked_messages = UAM} -> + acktags = ChAckTags} -> erlang:demonitor(MonitorRef), erase({ch, ChPid}), State1 = State#q{ @@ -334,15 +415,12 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> ChPid, State#q.blocked_consumers)}, case should_auto_delete(State1) of true -> {stop, State1}; - false -> case Txn of - none -> ok; - _ -> ok = rollback_work(Txn, qname(State1)), - erase_tx(Txn) - end, - {ok, deliver_or_enqueue_n( - [{Message, true} || - {_MsgId, Message} <- dict:to_list(UAM)], - State1)} + false -> State2 = case Txn of + none -> State1; + _ -> rollback_transaction(Txn, ChPid, + State1) + end, + {ok, requeue_and_run(sets:to_list(ChAckTags), State2)} end end. @@ -373,122 +451,30 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -persist_message(_Txn, _QName, #basic_message{is_persistent = false}) -> - ok; -persist_message(Txn, QName, Message) -> - M = Message#basic_message{ - %% don't persist any recoverable decoded properties, rebuild from properties_bin on restore - content = rabbit_binary_parser:clear_decoded_content( - Message#basic_message.content)}, - persist_work(Txn, QName, - [{publish, M, {QName, M#basic_message.guid}}]). - -persist_delivery(_QName, _Message, - true) -> - ok; -persist_delivery(_QName, #basic_message{is_persistent = false}, - _IsDelivered) -> - ok; -persist_delivery(QName, #basic_message{guid = Guid}, - _IsDelivered) -> - persist_work(none, QName, [{deliver, {QName, Guid}}]). - -persist_acks(Txn, QName, Messages) -> - persist_work(Txn, QName, - [{ack, {QName, Guid}} || #basic_message{ - guid = Guid, is_persistent = true} <- Messages]). - -persist_auto_ack(_QName, #basic_message{is_persistent = false}) -> - ok; -persist_auto_ack(QName, #basic_message{guid = Guid}) -> - %% auto-acks are always non-transactional - rabbit_persister:dirty_work([{ack, {QName, Guid}}]). - -persist_work(_Txn,_QName, []) -> - ok; -persist_work(none, _QName, WorkList) -> - rabbit_persister:dirty_work(WorkList); -persist_work(Txn, QName, WorkList) -> - mark_tx_persistent(Txn), - rabbit_persister:extend_transaction({Txn, QName}, WorkList). - -commit_work(Txn, QName) -> - do_if_persistent(fun rabbit_persister:commit_transaction/1, - Txn, QName). - -rollback_work(Txn, QName) -> - do_if_persistent(fun rabbit_persister:rollback_transaction/1, - Txn, QName). - -%% optimisation: don't do unnecessary work -%% it would be nice if this was handled by the persister -do_if_persistent(F, Txn, QName) -> - case is_tx_persistent(Txn) of - false -> ok; - true -> ok = F({Txn, QName}) - end. - -lookup_tx(Txn) -> - case get({txn, Txn}) of - undefined -> #tx{is_persistent = false, - pending_messages = [], - pending_acks = []}; - V -> V - end. - -store_tx(Txn, Tx) -> - put({txn, Txn}, Tx). - -erase_tx(Txn) -> - erase({txn, Txn}). - -all_tx() -> - [Txn || {{txn, Txn}, _} <- get()]. - -mark_tx_persistent(Txn) -> - Tx = lookup_tx(Txn), - store_tx(Txn, Tx#tx{is_persistent = true}). - -is_tx_persistent(Txn) -> - #tx{is_persistent = Res} = lookup_tx(Txn), - Res. - -record_pending_message(Txn, ChPid, Message) -> - Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), - record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}). +maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> + run_message_queue(State#q{backing_queue_state = Fun(BQS)}). + +commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {AckTags, BQS1} = + BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, BQS), + %% ChPid must be known here because of the participant management + %% by the channel. + C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), + State#q{backing_queue_state = BQS1}. + +rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), + %% Iff we removed acktags from the channel record on ack+txn then + %% we would add them back in here (would also require ChPid) + record_current_channel_tx(ChPid, none), + State#q{backing_queue_state = BQS1}. -record_pending_acks(Txn, ChPid, MsgIds) -> - Tx = #tx{pending_acks = Pending} = lookup_tx(Txn), - record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending]}). - -process_pending(Txn, ChPid, State) -> - #tx{pending_messages = PendingMessages, pending_acks = PendingAcks} = - lookup_tx(Txn), - C = #cr{unacked_messages = UAM} = lookup_ch(ChPid), - {_Acked, Remaining} = collect_messages(lists:append(PendingAcks), UAM), - store_ch_record(C#cr{unacked_messages = Remaining}), - deliver_or_enqueue_n(lists:reverse(PendingMessages), State). - -collect_messages(MsgIds, UAM) -> - lists:mapfoldl( - fun (MsgId, D) -> {dict:fetch(MsgId, D), dict:erase(MsgId, D)} end, - UAM, MsgIds). - -purge_message_buffer(QName, MessageBuffer) -> - Messages = - [[Message || {Message, _IsDelivered} <- - queue:to_list(MessageBuffer)] | - lists:map( - fun (#cr{unacked_messages = UAM}) -> - [Message || {_MsgId, Message} <- dict:to_list(UAM)] - end, - all_ch_record())], - %% the simplest, though certainly not the most obvious or - %% efficient, way to purge messages from the persister is to - %% artifically ack them. - persist_acks(none, QName, lists:append(Messages)). +subtract_acks(A, B) when is_list(B) -> + lists:foldl(fun sets:del_element/2, A, B). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -510,11 +496,10 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = none}) -> ''; i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) -> ConsumerTag; -i(messages_ready, #q{message_buffer = MessageBuffer}) -> - queue:len(MessageBuffer); +i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) -> + BQ:len(BQS); i(messages_unacknowledged, _) -> - lists:sum([dict:size(UAM) || - #cr{unacked_messages = UAM} <- all_ch_record()]); + lists:sum([sets:size(C#cr.acktags) || C <- all_ch_record()]); i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, messages_unacknowledged]]); @@ -523,6 +508,8 @@ i(consumers, State) -> i(memory, _) -> {memory, M} = process_info(self(), memory), M; +i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> + BQ:status(BQS); i(Item, _) -> throw({bad_argument, Item}). @@ -572,13 +559,8 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) -> reply(Delivered, NewState); handle_call({commit, Txn, ChPid}, From, State) -> - ok = commit_work(Txn, qname(State)), - %% optimisation: we reply straight away so the sender can continue - gen_server2:reply(From, ok), - NewState = process_pending(Txn, ChPid, State), - erase_tx(Txn), - record_current_channel_tx(ChPid, none), - noreply(NewState); + NewState = commit_transaction(Txn, From, ChPid, State), + noreply(run_message_queue(NewState)); handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues @@ -593,26 +575,19 @@ handle_call({notify_down, ChPid}, _From, State) -> handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}, - next_msg_id = NextId, - message_buffer = MessageBuffer}) -> - case queue:out(MessageBuffer) of - {{value, {Message, IsDelivered}}, BufferTail} -> - AckRequired = not(NoAck), + backing_queue_state = BQS, backing_queue = BQ}) -> + AckRequired = not NoAck, + case BQ:fetch(AckRequired, BQS) of + {empty, BQS1} -> reply(empty, State#q{backing_queue_state = BQS1}); + {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> case AckRequired of - true -> - persist_delivery(QName, Message, IsDelivered), - C = #cr{unacked_messages = UAM} = ch_record(ChPid), - NewUAM = dict:store(NextId, Message, UAM), - store_ch_record(C#cr{unacked_messages = NewUAM}); - false -> - persist_auto_ack(QName, Message) + true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), + store_ch_record( + C#cr{acktags = sets:add_element(AckTag, ChAckTags)}); + false -> ok end, - Msg = {QName, self(), NextId, IsDelivered, Message}, - reply({ok, queue:len(BufferTail), Msg}, - State#q{message_buffer = BufferTail, - next_msg_id = NextId + 1}); - {empty, _} -> - reply(empty, State) + Msg = {QName, self(), AckTag, IsDelivered, Message}, + reply({ok, Remaining, Msg}, State#q{backing_queue_state = BQS1}) end; handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, @@ -630,7 +605,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ok -> C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, - ack_required = not(NoAck)}, + ack_required = not NoAck}, store_ch_record(C#cr{consumer_count = ConsumerCount +1, limiter_pid = LimiterPid}), case ConsumerCount of @@ -692,14 +667,14 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, end; handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, - message_buffer = MessageBuffer, + backing_queue = BQ, + backing_queue_state = BQS, active_consumers = ActiveConsumers}) -> - Length = queue:len(MessageBuffer), - reply({ok, Name, Length, queue:len(ActiveConsumers)}, State); + reply({ok, Name, BQ:len(BQS), queue:len(ActiveConsumers)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, - State = #q{message_buffer = MessageBuffer}) -> - IsEmpty = queue:is_empty(MessageBuffer), + State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> + IsEmpty = BQ:is_empty(BQS), IsUnused = is_unused(State), if IfEmpty and not(IsEmpty) -> @@ -707,13 +682,13 @@ handle_call({delete, IfUnused, IfEmpty}, _From, IfUnused and not(IsUnused) -> reply({error, in_use}, State); true -> - {stop, normal, {ok, queue:len(MessageBuffer)}, State} + {stop, normal, {ok, BQ:len(BQS)}, State} end; -handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) -> - ok = purge_message_buffer(qname(State), MessageBuffer), - reply({ok, queue:len(MessageBuffer)}, - State#q{message_buffer = queue:new()}); +handle_call(purge, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {Count, BQS1} = BQ:purge(BQS), + reply({ok, Count}, State#q{backing_queue_state = BQS1}); handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, exclusive_consumer = Holder}) -> @@ -736,55 +711,54 @@ handle_call({claim_queue, ReaderPid}, _From, reply(ok, State); _ -> reply(locked, State) - end. + end; + +handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> + reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). -handle_cast({init, Recover}, State = #q{message_buffer = undefined}) -> +handle_cast({init, Recover}, + State = #q{q = #amqqueue{name = QName, durable = IsDurable}, + backing_queue = BQ, backing_queue_state = undefined}) -> ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [self()]), - Messages = case Recover of - true -> rabbit_persister:queue_content(qname(State)); - false -> [] - end, - noreply(State#q{message_buffer = queue:from_list(Messages)}); + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), + noreply(State#q{backing_queue_state = BQ:init(QName, IsDurable, Recover)}); handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), noreply(NewState); -handle_cast({ack, Txn, MsgIds, ChPid}, State) -> +handle_cast({ack, Txn, AckTags, ChPid}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case lookup_ch(ChPid) of not_found -> noreply(State); - C = #cr{unacked_messages = UAM} -> - {Acked, Remaining} = collect_messages(MsgIds, UAM), - persist_acks(Txn, qname(State), Acked), - case Txn of - none -> - store_ch_record(C#cr{unacked_messages = Remaining}); - _ -> - record_pending_acks(Txn, ChPid, MsgIds) - end, - noreply(State) + C = #cr{acktags = ChAckTags} -> + {C1, BQS1} = + case Txn of + none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), + {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)}; + _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)} + end, + store_ch_record(C1), + noreply(State #q { backing_queue_state = BQS1 }) end; handle_cast({rollback, Txn, ChPid}, State) -> - ok = rollback_work(Txn, qname(State)), - erase_tx(Txn), - record_current_channel_tx(ChPid, none), - noreply(State); + noreply(rollback_transaction(Txn, ChPid, State)); -handle_cast({requeue, MsgIds, ChPid}, State) -> +handle_cast({requeue, AckTags, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", [ChPid]), noreply(State); - C = #cr{unacked_messages = UAM} -> - {Messages, NewUAM} = collect_messages(MsgIds, UAM), - store_ch_record(C#cr{unacked_messages = NewUAM}), - noreply(deliver_or_enqueue_n( - [{Message, true} || Message <- Messages], State)) + C = #cr{acktags = ChAckTags} -> + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + store_ch_record(C#cr{acktags = ChAckTags1}), + noreply(requeue_and_run(AckTags, State)) end; handle_cast({unblock, ChPid}, State) -> @@ -819,6 +793,20 @@ handle_cast({flush, ChPid}, State) -> ok = rabbit_channel:flushed(ChPid, self()), noreply(State); +handle_cast(update_ram_duration, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + noreply(State#q{rate_timer_ref = just_measured, + backing_queue_state = BQS2}); + +handle_cast({set_ram_duration_target, Duration}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + BQS1 = BQ:set_ram_duration_target(Duration, BQS), + noreply(State#q{backing_queue_state = BQS1}); + handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State). @@ -842,6 +830,24 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> {stop, NewState} -> {stop, normal, NewState} end; +handle_info(timeout, State = #q{backing_queue = BQ}) -> + noreply(maybe_run_queue_via_backing_queue( + fun (BQS) -> BQ:sync(BQS) end, State)); + +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; + handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. + +handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> + {hibernate, State}; +handle_pre_hibernate(State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQS1 = BQ:handle_pre_hibernate(BQS), + %% no activity for a while == 0 egress and ingress rates + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), infinity), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + {hibernate, stop_rate_timer(State#q{backing_queue_state = BQS2})}. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl new file mode 100644 index 00000000..2dba00ad --- /dev/null +++ b/src/rabbit_backing_queue.erl @@ -0,0 +1,133 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_backing_queue). + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + %% Called on startup with a list of durable queue names. The + %% queues aren't being started at this point, but this call + %% allows the backing queue to perform any checking necessary for + %% the consistency of those queues, or initialise any other + %% shared resources. + {start, 1}, + + %% Initialise the backing queue and its state. + {init, 3}, + + %% Called on queue shutdown when queue isn't being deleted. + {terminate, 1}, + + %% Called when the queue is terminating and needs to delete all + %% its content. + {delete_and_terminate, 1}, + + %% Remove all messages in the queue, but not messages which have + %% been fetched and are pending acks. + {purge, 1}, + + %% Publish a message. + {publish, 2}, + + %% Called for messages which have already been passed straight + %% out to a client. The queue will be empty for these calls + %% (i.e. saves the round trip through the backing queue). + {publish_delivered, 3}, + + %% Produce the next message. + {fetch, 2}, + + %% Acktags supplied are for messages which can now be forgotten + %% about. + {ack, 2}, + + %% A publish, but in the context of a transaction. + {tx_publish, 3}, + + %% Acks, but in the context of a transaction. + {tx_ack, 3}, + + %% Undo anything which has been done in the context of the + %% specified transaction. + {tx_rollback, 2}, + + %% Commit a transaction. The Fun passed in must be called once + %% the messages have really been commited. This CPS permits the + %% possibility of commit coalescing. + {tx_commit, 3}, + + %% Reinsert messages into the queue which have already been + %% delivered and were pending acknowledgement. + {requeue, 2}, + + %% How long is my queue? + {len, 1}, + + %% Is my queue empty? + {is_empty, 1}, + + %% For the next three functions, the assumption is that you're + %% monitoring something like the ingress and egress rates of the + %% queue. The RAM duration is thus the length of time represented + %% by the messages held in RAM given the current rates. If you + %% want to ignore all of this stuff, then do so, and return 0 in + %% ram_duration/1. + + %% The target is to have no more messages in RAM than indicated + %% by the duration and the current queue rates. + {set_ram_duration_target, 2}, + + %% Optionally recalculate the duration internally (likely to be + %% just update your internal rates), and report how many seconds + %% the messages in RAM represent given the current rates of the + %% queue. + {ram_duration, 1}, + + %% Should 'sync' be called as soon as the queue process can + %% manage (either on an empty mailbox, or when a timer fires)? + {needs_sync, 1}, + + %% Called (eventually) after needs_sync returns 'true'. Note this + %% may be called more than once for each 'true' returned from + %% needs_sync. + {sync, 1}, + + %% Called immediately before the queue hibernates. + {handle_pre_hibernate, 1}, + + %% Exists for debugging purposes, to be able to expose state via + %% rabbitmqctl list_queues backing_queue_status + {status, 1} + ]; +behaviour_info(_Other) -> + undefined. diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl new file mode 100644 index 00000000..b4fd9156 --- /dev/null +++ b/src/rabbit_invariable_queue.erl @@ -0,0 +1,264 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_invariable_queue). + +-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2, + publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, + tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, + set_ram_duration_target/2, ram_duration/1, needs_sync/1, sync/1, + handle_pre_hibernate/1, status/1]). + +-export([start/1]). + +-behaviour(rabbit_backing_queue). + +-include("rabbit.hrl"). + +-record(iv_state, { queue, qname, len, pending_ack }). +-record(tx, { pending_messages, pending_acks, is_persistent }). + +-ifdef(use_specs). + +-type(ack() :: guid() | 'blank_ack'). +-type(state() :: #iv_state { queue :: queue(), + qname :: queue_name(), + len :: non_neg_integer(), + pending_ack :: dict() + }). +-include("rabbit_backing_queue_spec.hrl"). + +-endif. + +start(DurableQueues) -> + ok = rabbit_sup:start_child(rabbit_persister, [DurableQueues]). + +init(QName, IsDurable, Recover) -> + Q = queue:from_list(case IsDurable andalso Recover of + true -> rabbit_persister:queue_content(QName); + false -> [] + end), + #iv_state { queue = Q, qname = QName, len = queue:len(Q), + pending_ack = dict:new() }. + +terminate(State) -> + State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }. + +delete_and_terminate(State = #iv_state { qname = QName, pending_ack = PA }) -> + ok = persist_acks(none, QName, dict:fetch_keys(PA), PA), + {_PLen, State1} = purge(State), + terminate(State1). + +purge(State = #iv_state { len = Len, queue = Q, qname = QName }) -> + %% We do not purge messages pending acks. + {AckTags, PA} = + rabbit_misc:queue_fold( + fun ({#basic_message { is_persistent = false }, _IsDelivered}, Acc) -> + Acc; + ({Msg = #basic_message { guid = Guid }, IsDelivered}, + {AckTagsN, PAN}) -> + ok = persist_delivery(QName, Msg, IsDelivered), + {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)} + end, {[], dict:new()}, Q), + ok = persist_acks(none, QName, AckTags, PA), + {Len, State #iv_state { len = 0, queue = queue:new() }}. + +publish(Msg, State = #iv_state { queue = Q, qname = QName, len = Len }) -> + ok = persist_message(none, QName, Msg), + State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }. + +publish_delivered(false, _Msg, State) -> + {blank_ack, State}; +publish_delivered(true, Msg = #basic_message { guid = Guid }, + State = #iv_state { qname = QName, len = 0, + pending_ack = PA }) -> + ok = persist_message(none, QName, Msg), + ok = persist_delivery(QName, Msg, false), + {Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}. + +fetch(_AckRequired, State = #iv_state { len = 0 }) -> + {empty, State}; +fetch(AckRequired, State = #iv_state { queue = Q, qname = QName, len = Len, + pending_ack = PA }) -> + {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} = + queue:out(Q), + Len1 = Len - 1, + ok = persist_delivery(QName, Msg, IsDelivered), + PA1 = dict:store(Guid, Msg, PA), + {AckTag, PA2} = case AckRequired of + true -> {Guid, PA1}; + false -> ok = persist_acks(none, QName, [Guid], PA1), + {blank_ack, PA} + end, + {{Msg, IsDelivered, AckTag, Len1}, + State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}. + +ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA }) -> + ok = persist_acks(none, QName, AckTags, PA), + PA1 = remove_acks(AckTags, PA), + State #iv_state { pending_ack = PA1 }. + +tx_publish(Txn, Msg, State = #iv_state { qname = QName }) -> + Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), + store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), + ok = persist_message(Txn, QName, Msg), + State. + +tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA }) -> + Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), + store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), + ok = persist_acks(Txn, QName, AckTags, PA), + State. + +tx_rollback(Txn, State = #iv_state { qname = QName }) -> + #tx { pending_acks = AckTags } = lookup_tx(Txn), + ok = do_if_persistent(fun rabbit_persister:rollback_transaction/1, + Txn, QName), + erase_tx(Txn), + {lists:flatten(AckTags), State}. + +tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA, + queue = Q, len = Len }) -> + #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn), + ok = do_if_persistent(fun rabbit_persister:commit_transaction/1, + Txn, QName), + erase_tx(Txn), + Fun(), + AckTags1 = lists:flatten(AckTags), + PA1 = remove_acks(AckTags1, PA), + {Q1, Len1} = lists:foldr(fun (Msg, {QN, LenN}) -> + {queue:in({Msg, false}, QN), LenN + 1} + end, {Q, Len}, PubsRev), + {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}. + +requeue(AckTags, State = #iv_state { pending_ack = PA, queue = Q, + len = Len }) -> + %% We don't need to touch the persister here - the persister will + %% already have these messages published and delivered as + %% necessary. The complication is that the persister's seq_id will + %% now be wrong, given the position of these messages in our queue + %% here. However, the persister's seq_id is only used for sorting + %% on startup, and requeue is silent as to where the requeued + %% messages should appear, thus the persister is permitted to sort + %% based on seq_id, even though it'll likely give a different + %% order to the last known state of our queue, prior to shutdown. + {Q1, Len1} = lists:foldl( + fun (Guid, {QN, LenN}) -> + {ok, Msg = #basic_message {}} = dict:find(Guid, PA), + {queue:in({Msg, true}, QN), LenN + 1} + end, {Q, Len}, AckTags), + PA1 = remove_acks(AckTags, PA), + State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }. + +len(#iv_state { len = Len }) -> Len. + +is_empty(State) -> 0 == len(State). + +set_ram_duration_target(_DurationTarget, State) -> State. + +ram_duration(State) -> {0, State}. + +needs_sync(_State) -> false. + +sync(State) -> State. + +handle_pre_hibernate(State) -> State. + +status(_State) -> []. + +%%---------------------------------------------------------------------------- + +remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags). + +%%---------------------------------------------------------------------------- + +lookup_tx(Txn) -> + case get({txn, Txn}) of + undefined -> #tx { pending_messages = [], + pending_acks = [], + is_persistent = false }; + V -> V + end. + +store_tx(Txn, Tx) -> + put({txn, Txn}, Tx). + +erase_tx(Txn) -> + erase({txn, Txn}). + +mark_tx_persistent(Txn) -> + store_tx(Txn, (lookup_tx(Txn)) #tx { is_persistent = true }). + +is_tx_persistent(Txn) -> + (lookup_tx(Txn)) #tx.is_persistent. + +do_if_persistent(F, Txn, QName) -> + ok = case is_tx_persistent(Txn) of + false -> ok; + true -> F({Txn, QName}) + end. + +%%---------------------------------------------------------------------------- + +persist_message(_Txn, _QName, #basic_message { is_persistent = false }) -> + ok; +persist_message(Txn, QName, Msg) -> + Msg1 = Msg #basic_message { + %% don't persist any recoverable decoded properties, + %% rebuild from properties_bin on restore + content = rabbit_binary_parser:clear_decoded_content( + Msg #basic_message.content)}, + persist_work(Txn, QName, + [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]). + +persist_delivery(_QName, #basic_message { is_persistent = false }, + _IsDelivered) -> + ok; +persist_delivery(_QName, _Message, true) -> + ok; +persist_delivery(QName, #basic_message { guid = Guid }, _IsDelivered) -> + persist_work(none, QName, [{deliver, {QName, Guid}}]). + +persist_acks(Txn, QName, AckTags, PA) -> + persist_work(Txn, QName, + [{ack, {QName, Guid}} || Guid <- AckTags, + begin + {ok, Msg} = dict:find(Guid, PA), + Msg #basic_message.is_persistent + end]). + +persist_work(_Txn,_QName, []) -> + ok; +persist_work(none, _QName, WorkList) -> + rabbit_persister:dirty_work(WorkList); +persist_work(Txn, QName, WorkList) -> + mark_tx_persistent(Txn), + rabbit_persister:extend_transaction({Txn, QName}, WorkList). -- cgit v1.2.1