diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-04-09 15:37:12 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-04-09 15:37:12 +0100 |
commit | dee56eb6e77d4fd719b1dfabdf3dbeaf6e1bed39 (patch) | |
tree | 058cc2edb4e226459d80475b3fd437c02f79333e /src | |
parent | 2829795d6ed0857892cc23c161fcade87fd114cf (diff) | |
download | rabbitmq-server-git-dee56eb6e77d4fd719b1dfabdf3dbeaf6e1bed39.tar.gz |
All sorts of tidying, cosmetics, reorganisation and pruning. A veritable smörgåsbord of improvements.
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_amqqueue.erl | 53 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 278 | ||||
-rw-r--r-- | src/rabbit_backing_queue_type.erl (renamed from src/rabbit_internal_queue_type.erl) | 39 | ||||
-rw-r--r-- | src/rabbit_memory_monitor.erl | 10 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 11 | ||||
-rw-r--r-- | src/rabbit_msg_file.erl | 2 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 2 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 12 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 30 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 37 | ||||
-rw-r--r-- | src/random_distributions.erl | 38 |
11 files changed, 254 insertions, 258 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 00407824d8..235b1edbc7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -32,8 +32,8 @@ -module(rabbit_amqqueue). -export([start/0, declare/4, delete/3, purge/1]). --export([internal_declare/2, internal_delete/1, remeasure_rates/1, - set_queue_duration/2, set_maximum_since_use/2]). +-export([internal_declare/2, internal_delete/1, update_ram_duration/1, + set_ram_duration_target/2, set_maximum_since_use/2]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). @@ -41,7 +41,7 @@ -export([consumers/1, consumers_all/1]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2, unblock/2, maybe_run_queue_via_internal_queue/3, +-export([notify_sent/2, unblock/2, maybe_run_queue_via_backing_queue/3, flush_all/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -109,12 +109,12 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(maybe_run_queue_via_internal_queue/3 :: (pid(), atom(), [any()]) -> 'ok'). +-spec(maybe_run_queue_via_backing_queue/3 :: (pid(), atom(), [any()]) -> 'ok'). -spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). --spec(remeasure_rates/1 :: (pid()) -> 'ok'). --spec(set_queue_duration/2 :: (pid(), number()) -> 'ok'). +-spec(update_ram_duration/1 :: (pid()) -> 'ok'). +-spec(set_ram_duration_target/2 :: (pid(), number()) -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -124,13 +124,8 @@ %%---------------------------------------------------------------------------- start() -> - ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()), - ok = rabbit_sup:start_child( - ?TRANSIENT_MSG_STORE, rabbit_msg_store, - [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), undefined, - fun (ok) -> finished end, ok]), DurableQueues = find_durable_queues(), - ok = rabbit_queue_index:start_persistent_msg_store(DurableQueues), + ok = rabbit_queue_index:start_msg_stores(DurableQueues), {ok,_} = supervisor:start_child( rabbit_sup, {rabbit_amqqueue_sup, @@ -152,7 +147,7 @@ find_durable_queues() -> recover_durable_queues(DurableQueues) -> Qs = lists:foldl( fun (RecoveredQ, Acc) -> - Q = start_queue_process(RecoveredQ), + Q = start_queue_process(RecoveredQ, false), %% We need to catch the case where a client %% connected to another node has deleted the queue %% (and possibly re-created it). @@ -166,16 +161,14 @@ recover_durable_queues(DurableQueues) -> [] -> false end end) of - true -> - ok = gen_server2:call(Q#amqqueue.pid, - init_internal_queue, - infinity), - [Q|Acc]; + true -> [Q|Acc]; false -> exit(Q#amqqueue.pid, shutdown), Acc end end, [], DurableQueues), - [ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs], + %% Issue inits to *all* the queues so that they all init at the same time + [ok = gen_server2:cast(Q#amqqueue.pid, init_backing_queue) || Q <- Qs], + [ok = gen_server2:call(Q#amqqueue.pid, sync) || Q <- Qs], Qs. declare(QueueName, Durable, AutoDelete, Args) -> @@ -183,7 +176,7 @@ declare(QueueName, Durable, AutoDelete, Args) -> durable = Durable, auto_delete = AutoDelete, arguments = Args, - pid = none}), + pid = none}, true), internal_declare(Q, true). internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) -> @@ -198,9 +191,6 @@ internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) -> true -> add_default_binding(Q); false -> ok end, - ok = gen_server2:call( - Q#amqqueue.pid, - init_internal_queue, infinity), Q; [_] -> not_found %% existing Q on stopped node end; @@ -223,8 +213,9 @@ store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. -start_queue_process(Q) -> - {ok, Pid} = supervisor2:start_child(rabbit_amqqueue_sup, [Q]), +start_queue_process(Q, InitBackingQueue) -> + {ok, Pid} = + supervisor2:start_child(rabbit_amqqueue_sup, [Q, InitBackingQueue]), Q#amqqueue{pid = Pid}. add_default_binding(#amqqueue{name = QueueName}) -> @@ -358,8 +349,8 @@ notify_sent(QPid, ChPid) -> unblock(QPid, ChPid) -> gen_server2:pcast(QPid, 7, {unblock, ChPid}). -maybe_run_queue_via_internal_queue(QPid, Fun, Args) -> - gen_server2:pcast(QPid, 7, {maybe_run_queue_via_internal_queue, Fun, Args}). +maybe_run_queue_via_backing_queue(QPid, Fun, Args) -> + gen_server2:pcast(QPid, 7, {maybe_run_queue_via_backing_queue, Fun, Args}). flush_all(QPids, ChPid) -> safe_pmap_ok( @@ -388,11 +379,11 @@ internal_delete(QueueName) -> ok end. -remeasure_rates(QPid) -> - gen_server2:pcast(QPid, 8, remeasure_rates). +update_ram_duration(QPid) -> + gen_server2:pcast(QPid, 8, update_ram_duration). -set_queue_duration(QPid, Duration) -> - gen_server2:pcast(QPid, 8, {set_queue_duration, Duration}). +set_ram_duration_target(QPid, Duration) -> + gen_server2:pcast(QPid, 8, {set_ram_duration_target, Duration}). set_maximum_since_use(QPid, Age) -> gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 94e8662de6..a20cd6c3b1 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -37,9 +37,9 @@ -define(UNSENT_MESSAGE_LIMIT, 100). -define(SYNC_INTERVAL, 5). %% milliseconds --define(RATES_REMEASURE_INTERVAL, 5000). +-define(RAM_DURATION_UPDATE_INTERVAL, 5000). --export([start_link/1, info_keys/0]). +-export([start_link/2, info_keys/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). @@ -53,9 +53,9 @@ owner, exclusive_consumer, has_had_consumers, - internal_queue, - internal_queue_state, - internal_queue_timeout_fun, + backing_queue, + backing_queue_state, + backing_queue_timeout_fun, next_msg_id, active_consumers, blocked_consumers, @@ -94,34 +94,34 @@ consumers, transactions, memory, - internal_queue_status + backing_queue_status ]). %%---------------------------------------------------------------------------- -start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). +start_link(Q, InitBackingQueue) -> + gen_server2:start_link(?MODULE, [Q, InitBackingQueue], []). info_keys() -> ?INFO_KEYS. %%---------------------------------------------------------------------------- -init(Q) -> +init([Q, InitBQ]) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [self()]), ok = rabbit_memory_monitor:register - (self(), {rabbit_amqqueue, set_queue_duration, [self()]}), - {ok, InternalQueueModule} = - application:get_env(queue_internal_queue_module), + (self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), + {ok, BQ} = application:get_env(backing_queue_module), {ok, #q{q = Q, owner = none, exclusive_consumer = none, has_had_consumers = false, - internal_queue = InternalQueueModule, - internal_queue_state = undefined, - internal_queue_timeout_fun = undefined, + backing_queue = BQ, + backing_queue_state = maybe_init_backing_queue(InitBQ, BQ, Q), + backing_queue_timeout_fun = undefined, next_msg_id = 1, active_consumers = queue:new(), blocked_consumers = queue:new(), @@ -129,33 +129,39 @@ init(Q) -> rate_timer_ref = undefined}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -terminate(shutdown, #q{internal_queue_state = IQS, - internal_queue = IQ}) -> +maybe_init_backing_queue( + true, BQ, #amqqueue{name = QName, durable = IsDurable}) -> + BQ:init(QName, IsDurable); +maybe_init_backing_queue(false, _BQ, _Q) -> + undefined. + +terminate(shutdown, #q{backing_queue_state = BQS, + backing_queue = BQ}) -> ok = rabbit_memory_monitor:deregister(self()), - case IQS of + case BQS of undefined -> ok; - _ -> IQ:terminate(IQS) + _ -> BQ:terminate(BQS) end; -terminate({shutdown, _}, #q{internal_queue_state = IQS, - internal_queue = IQ}) -> +terminate({shutdown, _}, #q{backing_queue_state = BQS, + backing_queue = BQ}) -> ok = rabbit_memory_monitor:deregister(self()), - case IQS of + case BQS of undefined -> ok; - _ -> IQ:terminate(IQS) + _ -> BQ:terminate(BQS) end; -terminate(_Reason, State = #q{internal_queue_state = IQS, - internal_queue = IQ}) -> +terminate(_Reason, State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> ok = rabbit_memory_monitor:deregister(self()), %% FIXME: How do we cancel active subscriptions? %% Ensure that any persisted tx messages are removed. %% TODO: wait for all in flight tx_commits to complete - case IQS of + case BQS of undefined -> ok; _ -> - IQS1 = IQ:tx_rollback( + BQS1 = BQ:tx_rollback( lists:concat([PM || #tx { pending_messages = PM } <- - all_tx_record()]), IQS), + all_tx_record()]), BQS), %% Delete from disk first. If we crash at this point, when %% a durable queue, we will be recreated at startup, %% possibly with partial content. The alternative is much @@ -163,7 +169,7 @@ terminate(_Reason, State = #q{internal_queue_state = IQS, %% would then have a race between the disk delete and a %% new queue with the same name being created and %% published to. - IQ:delete_and_terminate(IQS1) + BQ:delete_and_terminate(BQS1) end, ok = rabbit_amqqueue:internal_delete(qname(State)). @@ -182,9 +188,9 @@ noreply(NewState) -> {NewState1, Timeout} = next_state(NewState), {noreply, NewState1, Timeout}. -next_state(State = #q{internal_queue_state = IQS, - internal_queue = IQ}) -> - next_state1(ensure_rate_timer(State), IQ:needs_sync(IQS)). +next_state(State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + next_state1(ensure_rate_timer(State), BQ:needs_sync(BQS)). next_state1(State = #q{sync_timer_ref = undefined}, Callback = {_Fun, _Args}) -> {start_sync_timer(State, Callback), 0}; @@ -193,11 +199,11 @@ next_state1(State, {_Fun, _Args}) -> next_state1(State = #q{sync_timer_ref = undefined}, undefined) -> {State, hibernate}; next_state1(State, undefined) -> - {stop_sync_timer(State#q{internal_queue_timeout_fun = undefined}), hibernate}. + {stop_sync_timer(State#q{backing_queue_timeout_fun = undefined}), hibernate}. ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> - {ok, TRef} = timer:apply_after(?RATES_REMEASURE_INTERVAL, rabbit_amqqueue, - remeasure_rates, [self()]), + {ok, TRef} = timer:apply_after(?RAM_DURATION_UPDATE_INTERVAL, rabbit_amqqueue, + update_ram_duration, [self()]), State#q{rate_timer_ref = TRef}; ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) -> State#q{rate_timer_ref = undefined}; @@ -216,16 +222,16 @@ start_sync_timer(State = #q{sync_timer_ref = undefined}, Callback = {Fun, Args}) -> {ok, TRef} = timer:apply_after( ?SYNC_INTERVAL, rabbit_amqqueue, - maybe_run_queue_via_internal_queue, [self(), Fun, Args]), - State#q{sync_timer_ref = TRef, internal_queue_timeout_fun = Callback}. + maybe_run_queue_via_backing_queue, [self(), Fun, Args]), + State#q{sync_timer_ref = TRef, backing_queue_timeout_fun = Callback}. stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> {ok, cancel} = timer:cancel(TRef), - State#q{sync_timer_ref = undefined, internal_queue_timeout_fun = undefined}. + State#q{sync_timer_ref = undefined, backing_queue_timeout_fun = undefined}. -assert_invariant(#q{active_consumers = AC, internal_queue_state = IQS, - internal_queue = IQ}) -> - true = (queue:is_empty(AC) orelse IQ:is_empty(IQS)). +assert_invariant(#q{active_consumers = AC, backing_queue_state = BQS, + backing_queue = BQ}) -> + true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -340,73 +346,73 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) -> not IsEmpty. deliver_from_queue_deliver(AckRequired, {false, AutoAcks}, - State = #q{internal_queue_state = IQS, - internal_queue = IQ}) -> - {{Message, IsDelivered, AckTag, Remaining}, IQS1} = IQ:fetch(IQS), + State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + {{Message, IsDelivered, AckTag, Remaining}, BQS1} = BQ:fetch(BQS), AutoAcks1 = case AckRequired of true -> AutoAcks; false -> [AckTag | AutoAcks] end, {{Message, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1}, - State #q { internal_queue_state = IQS1 }}. + State #q { backing_queue_state = BQS1 }}. -run_message_queue(State = #q{internal_queue_state = IQS, - internal_queue = IQ}) -> +run_message_queue(State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> Funs = { fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3 }, - IsEmpty = IQ:is_empty(IQS), + IsEmpty = BQ:is_empty(BQS), {{_IsEmpty1, AutoAcks}, State1} = deliver_msgs_to_consumers(Funs, {IsEmpty, []}, State), - IQS1 = IQ:ack(AutoAcks, State1 #q.internal_queue_state), - State1 #q { internal_queue_state = IQS1 }. + BQS1 = BQ:ack(AutoAcks, State1 #q.backing_queue_state), + State1 #q { backing_queue_state = BQS1 }. -attempt_delivery(none, _ChPid, Message, State = #q{internal_queue = IQ}) -> +attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1) -> {AckTag, State2} = case AckRequired of true -> - {AckTag1, IQS} = - IQ:publish_delivered( - Message, State1 #q.internal_queue_state), - {AckTag1, State1 #q { internal_queue_state = IQS }}; + {AckTag1, BQS} = + BQ:publish_delivered( + Message, State1 #q.backing_queue_state), + {AckTag1, State1 #q { backing_queue_state = BQS }}; false -> {noack, State1} end, {{Message, false, AckTag}, true, State2} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, State = #q{internal_queue = IQ}) -> - IQS = IQ:tx_publish(Message, State #q.internal_queue_state), +attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> + BQS = BQ:tx_publish(Message, State #q.backing_queue_state), record_pending_message(Txn, ChPid, Message), - {true, State #q { internal_queue_state = IQS }}. + {true, State #q { backing_queue_state = BQS }}. -deliver_or_enqueue(Txn, ChPid, Message, State = #q{internal_queue = IQ}) -> +deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> case attempt_delivery(Txn, ChPid, Message, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - IQS = IQ:publish(Message, State #q.internal_queue_state), - {false, NewState #q { internal_queue_state = IQS }} + BQS = BQ:publish(Message, State #q.backing_queue_state), + {false, NewState #q { backing_queue_state = BQS }} end. %% all these messages have already been delivered at least once and %% not ack'd, but need to be either redelivered or requeued deliver_or_requeue_n([], State) -> State; -deliver_or_requeue_n(MsgsWithAcks, State = #q{internal_queue = IQ}) -> +deliver_or_requeue_n(MsgsWithAcks, State = #q{backing_queue = BQ}) -> Funs = { fun deliver_or_requeue_msgs_pred/2, fun deliver_or_requeue_msgs_deliver/3 }, {{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} = deliver_msgs_to_consumers( Funs, {length(MsgsWithAcks), [], MsgsWithAcks}, State), - IQS = IQ:ack(AutoAcks, NewState #q.internal_queue_state), + BQS = BQ:ack(AutoAcks, NewState #q.backing_queue_state), case OutstandingMsgs of - [] -> NewState #q { internal_queue_state = IQS }; - _ -> IQS1 = IQ:requeue(OutstandingMsgs, IQS), - NewState #q { internal_queue_state = IQS1 } + [] -> NewState #q { backing_queue_state = BQS }; + _ -> BQS1 = BQ:requeue(OutstandingMsgs, BQS), + NewState #q { backing_queue_state = BQS1 } end. deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) -> @@ -518,11 +524,11 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -maybe_run_queue_via_internal_queue(Fun, Args, - State = #q{internal_queue_state = IQS, - internal_queue = IQ}) -> - {RunQueue, IQS1} = apply(IQ, Fun, Args ++ [IQS]), - State1 = State#q{internal_queue_state = IQS1}, +maybe_run_queue_via_backing_queue(Fun, Args, + State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + {RunQueue, BQS1} = apply(BQ, Fun, Args ++ [BQS]), + State1 = State#q{backing_queue_state = BQS1}, case RunQueue of true -> run_message_queue(State1); false -> State1 @@ -557,7 +563,7 @@ record_pending_acks(Txn, ChPid, MsgIds) -> store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], ch_pid = ChPid}). -commit_transaction(Txn, From, State = #q{internal_queue = IQ}) -> +commit_transaction(Txn, From, State = #q{backing_queue = BQ}) -> #tx{ch_pid = ChPid, pending_messages = PendingMessages, pending_acks = PendingAcks} = lookup_tx(Txn), PendingMessagesOrdered = lists:reverse(PendingMessages), @@ -572,16 +578,16 @@ commit_transaction(Txn, From, State = #q{internal_queue = IQ}) -> store_ch_record(C#cr{unacked_messages = Remaining}), [AckTag || {_Message, AckTag} <- MsgsWithAcks] end, - {RunQueue, IQS} = IQ:tx_commit(PendingMessagesOrdered, Acks, From, - State#q.internal_queue_state), + {RunQueue, BQS} = BQ:tx_commit(PendingMessagesOrdered, Acks, From, + State#q.backing_queue_state), erase_tx(Txn), - {RunQueue, State#q{internal_queue_state = IQS}}. + {RunQueue, State#q{backing_queue_state = BQS}}. -rollback_transaction(Txn, State = #q{internal_queue = IQ}) -> +rollback_transaction(Txn, State = #q{backing_queue = BQ}) -> #tx{pending_messages = PendingMessages} = lookup_tx(Txn), - IQS = IQ:tx_rollback(PendingMessages, State #q.internal_queue_state), + BQS = BQ:tx_rollback(PendingMessages, State #q.backing_queue_state), erase_tx(Txn), - State#q{internal_queue_state = IQS}. + State#q{backing_queue_state = BQS}. collect_messages(MsgIds, UAM) -> lists:mapfoldl( @@ -608,8 +614,8 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = none}) -> ''; i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) -> ConsumerTag; -i(messages_ready, #q{internal_queue_state = IQS, internal_queue = IQ}) -> - IQ:len(IQS); +i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) -> + BQ:len(BQS); i(messages_unacknowledged, _) -> lists:sum([dict:size(UAM) || #cr{unacked_messages = UAM} <- all_ch_record()]); @@ -630,26 +636,13 @@ i(transactions, _) -> i(memory, _) -> {memory, M} = process_info(self(), memory), M; -i(internal_queue_status, #q{internal_queue_state = IQS, internal_queue = IQ}) -> - IQ:status(IQS); +i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> + BQ:status(BQS); i(Item, _) -> throw({bad_argument, Item}). %--------------------------------------------------------------------------- -handle_call(init_internal_queue, From, State = - #q{internal_queue_state = undefined, internal_queue = IQ, - q = #amqqueue{name = QName, durable = IsDurable}}) -> - gen_server2:reply(From, ok), - PersistentStore = case IsDurable of - true -> ?PERSISTENT_MSG_STORE; - false -> ?TRANSIENT_MSG_STORE - end, - noreply(State#q{internal_queue_state = IQ:init(QName, PersistentStore)}); - -handle_call(init_internal_queue, _From, State) -> - reply(ok, State); - handle_call(sync, _From, State) -> reply(ok, State); @@ -713,24 +706,24 @@ handle_call({notify_down, ChPid}, _From, State) -> handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}, next_msg_id = NextId, - internal_queue_state = IQS, internal_queue = IQ}) -> - case IQ:fetch(IQS) of - {empty, IQS1} -> reply(empty, State #q { internal_queue_state = IQS1 }); - {{Message, IsDelivered, AckTag, Remaining}, IQS1} -> + backing_queue_state = BQS, backing_queue = BQ}) -> + case BQ:fetch(BQS) of + {empty, BQS1} -> reply(empty, State #q { backing_queue_state = BQS1 }); + {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> AckRequired = not(NoAck), - IQS2 = + BQS2 = case AckRequired of true -> C = #cr{unacked_messages = UAM} = ch_record(ChPid), NewUAM = dict:store(NextId, {Message, AckTag}, UAM), store_ch_record(C#cr{unacked_messages = NewUAM}), - IQS1; + BQS1; false -> - IQ:ack([AckTag], IQS1) + BQ:ack([AckTag], BQS1) end, Msg = {QName, self(), NextId, IsDelivered, Message}, reply({ok, Remaining, Msg}, - State #q { next_msg_id = NextId + 1, internal_queue_state = IQS2 }) + State #q { next_msg_id = NextId + 1, backing_queue_state = BQS2 }) end; handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, @@ -810,14 +803,14 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, end; handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, - internal_queue_state = IQS, - internal_queue = IQ, + backing_queue_state = BQS, + backing_queue = BQ, active_consumers = ActiveConsumers}) -> - reply({ok, Name, IQ:len(IQS), queue:len(ActiveConsumers)}, State); + reply({ok, Name, BQ:len(BQS), queue:len(ActiveConsumers)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, - State = #q{internal_queue_state = IQS, internal_queue = IQ}) -> - Length = IQ:len(IQS), + State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> + Length = BQ:len(BQS), IsEmpty = Length == 0, IsUnused = is_unused(State), if @@ -829,9 +822,9 @@ handle_call({delete, IfUnused, IfEmpty}, _From, {stop, normal, {ok, Length}, State} end; -handle_call(purge, _From, State = #q{internal_queue = IQ}) -> - {Count, IQS} = IQ:purge(State#q.internal_queue_state), - reply({ok, Count}, State#q{internal_queue_state = IQS}); +handle_call(purge, _From, State = #q{backing_queue = BQ}) -> + {Count, BQS} = BQ:purge(State#q.backing_queue_state), + reply({ok, Count}, State#q{backing_queue_state = BQS}); handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, exclusive_consumer = Holder}) -> @@ -856,12 +849,21 @@ handle_call({claim_queue, ReaderPid}, _From, reply(locked, State) end. + +handle_cast(init_backing_queue, State = #q{backing_queue_state = undefined, + backing_queue = BQ, q = Q}) -> + noreply(State#q{backing_queue_state = + maybe_init_backing_queue(true, BQ, Q)}); + +handle_cast(init_backing_queue, State) -> + noreply(State); + handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), noreply(NewState); -handle_cast({ack, Txn, MsgIds, ChPid}, State = #q{internal_queue = IQ}) -> +handle_cast({ack, Txn, MsgIds, ChPid}, State = #q{backing_queue = BQ}) -> case lookup_ch(ChPid) of not_found -> noreply(State); @@ -869,10 +871,10 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State = #q{internal_queue = IQ}) -> case Txn of none -> {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM), - IQS = IQ:ack([AckTag || {_Message, AckTag} <- MsgWithAcks], - State #q.internal_queue_state), + BQS = BQ:ack([AckTag || {_Message, AckTag} <- MsgWithAcks], + State #q.backing_queue_state), store_ch_record(C#cr{unacked_messages = Remaining}), - noreply(State #q { internal_queue_state = IQS }); + noreply(State #q { backing_queue_state = BQS }); _ -> record_pending_acks(Txn, ChPid, MsgIds), noreply(State) @@ -906,8 +908,8 @@ handle_cast({notify_sent, ChPid}, State) -> C#cr{unsent_message_count = Count - 1} end)); -handle_cast({maybe_run_queue_via_internal_queue, Fun, Args}, State) -> - noreply(maybe_run_queue_via_internal_queue(Fun, Args, State)); +handle_cast({maybe_run_queue_via_backing_queue, Fun, Args}, State) -> + noreply(maybe_run_queue_via_backing_queue(Fun, Args, State)); handle_cast({limit, ChPid, LimiterPid}, State) -> noreply( @@ -929,21 +931,21 @@ handle_cast({flush, ChPid}, State) -> ok = rabbit_channel:flushed(ChPid, self()), noreply(State); -handle_cast(remeasure_rates, State = #q{internal_queue_state = IQS, - internal_queue = IQ}) -> - IQS1 = IQ:remeasure_rates(IQS), - RamDuration = IQ:queue_duration(IQS1), +handle_cast(update_ram_duration, State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + BQS1 = BQ:update_ram_duration(BQS), + RamDuration = BQ:ram_duration(BQS1), DesiredDuration = - rabbit_memory_monitor:report_queue_duration(self(), RamDuration), - IQS2 = IQ:set_queue_duration_target(DesiredDuration, IQS1), + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), noreply(State#q{rate_timer_ref = just_measured, - internal_queue_state = IQS2}); + backing_queue_state = BQS2}); -handle_cast({set_queue_duration, Duration}, - State = #q{internal_queue_state = IQS, - internal_queue = IQ}) -> - IQS1 = IQ:set_queue_duration_target(Duration, IQS), - noreply(State#q{internal_queue_state = IQS1}); +handle_cast({set_ram_duration_target, Duration}, + State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + BQS1 = BQ:set_ram_duration_target(Duration, BQS), + noreply(State#q{backing_queue_state = BQS1}); handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -968,12 +970,12 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> {stop, NewState} -> {stop, normal, NewState} end; -handle_info(timeout, State = #q{internal_queue_timeout_fun = undefined}) -> +handle_info(timeout, State = #q{backing_queue_timeout_fun = undefined}) -> noreply(State); -handle_info(timeout, State = #q{internal_queue_timeout_fun = {Fun, Args}}) -> - noreply(maybe_run_queue_via_internal_queue( - Fun, Args, State#q{internal_queue_timeout_fun = undefined})); +handle_info(timeout, State = #q{backing_queue_timeout_fun = {Fun, Args}}) -> + noreply(maybe_run_queue_via_backing_queue( + Fun, Args, State#q{backing_queue_timeout_fun = undefined})); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; @@ -982,11 +984,11 @@ handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. -handle_pre_hibernate(State = #q{internal_queue_state = IQS, - internal_queue = IQ}) -> - IQS1 = IQ:handle_pre_hibernate(IQS), +handle_pre_hibernate(State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + BQS1 = BQ:handle_pre_hibernate(BQS), %% no activity for a while == 0 egress and ingress rates DesiredDuration = - rabbit_memory_monitor:report_queue_duration(self(), infinity), - IQS2 = IQ:set_queue_duration_target(DesiredDuration, IQS1), - {hibernate, stop_rate_timer(State#q{internal_queue_state = IQS2})}. + rabbit_memory_monitor:report_ram_duration(self(), infinity), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + {hibernate, stop_rate_timer(State#q{backing_queue_state = BQS2})}. diff --git a/src/rabbit_internal_queue_type.erl b/src/rabbit_backing_queue_type.erl index 48d9314df2..46299d0207 100644 --- a/src/rabbit_internal_queue_type.erl +++ b/src/rabbit_backing_queue_type.erl @@ -29,14 +29,14 @@ %% Contributor(s): ______________________________________. %% --module(rabbit_internal_queue_type). +-module(rabbit_backing_queue_type). -export([behaviour_info/1]). behaviour_info(callbacks) -> [ - %% Called with queue name and the persistent msg_store to - %% use. Transient store is in ?TRANSIENT_MSG_STORE + %% Called with queue name and a boolean to indicate whether or + %% not the queue is durable. {init, 2}, %% Called on queue shutdown when queue isn't being deleted @@ -58,27 +58,52 @@ behaviour_info(callbacks) -> %% (i.e. saves the round trip through the internal queue). {publish_delivered, 2}, + %% Produce the next message {fetch, 1}, + %% Acktags supplied are for messages which can now be forgotten + %% about {ack, 2}, + %% A publish, but in the context of a transaction. {tx_publish, 2}, + + %% Undo anything which has been done by the tx_publish of the + %% indicated messages. {tx_rollback, 2}, + + %% Commit these publishes and acktags. The publishes you will + %% have previously seen in calls to tx_publish. {tx_commit, 4}, %% Reinsert messages into the queue which have already been %% delivered and were (likely) pending acks.q {requeue, 2}, + %% How long is my queue? {len, 1}, + %% Is my queue empty? {is_empty, 1}, - {set_queue_duration_target, 2}, + %% For the next three functions, the assumption is that you're + %% monitoring something like the ingress and egress rates of the + %% queue. The RAM duration is thus the length of time represented + %% by the messages held in RAM given the current rates. If you + %% want to ignore all of this stuff, then do so, and return 0 in + %% ram_duration/1. + + %% The target is to have no more messages in RAM than indicated + %% by the duration and the current queue rates. + {set_ram_duration_target, 2}, - {remeasure_rates, 1}, + %% Recalculate the duration internally (likely to be just update + %% your internal rates). + {update_ram_duration, 1}, - {queue_duration, 1}, + %% Report how many seconds the messages in RAM represent given + %% the current rates of the queue. + {ram_duration, 1}, %% Can return 'undefined' or a function atom name plus list of %% arguments to be invoked in the internal queue module as soon @@ -90,7 +115,7 @@ behaviour_info(callbacks) -> {handle_pre_hibernate, 1}, %% Exists for debugging purposes, to be able to expose state via - %% rabbitmqctl list_queues internal_queue_status + %% rabbitmqctl list_queues backing_queue_status {status, 1} ]; behaviour_info(_Other) -> diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index a76600fe15..91e97ffe49 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -40,7 +40,7 @@ -behaviour(gen_server2). -export([start_link/0, update/0, register/2, deregister/1, - report_queue_duration/2, stop/0]). + report_ram_duration/2, stop/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -90,7 +90,7 @@ -spec(update/0 :: () -> 'ok'). -spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok'). -spec(deregister/1 :: (pid()) -> 'ok'). --spec(report_queue_duration/2 :: (pid(), float() | 'infinity') -> number()). +-spec(report_ram_duration/2 :: (pid(), float() | 'infinity') -> number()). -spec(stop/0 :: () -> 'ok'). -endif. @@ -111,9 +111,9 @@ register(Pid, MFA = {_M, _F, _A}) -> deregister(Pid) -> gen_server2:cast(?SERVER, {deregister, Pid}). -report_queue_duration(Pid, QueueDuration) -> +report_ram_duration(Pid, QueueDuration) -> gen_server2:call(?SERVER, - {report_queue_duration, Pid, QueueDuration}, infinity). + {report_ram_duration, Pid, QueueDuration}, infinity). stop() -> gen_server2:cast(?SERVER, stop). @@ -143,7 +143,7 @@ init([]) -> memory_limit = MemoryLimit, desired_duration = infinity })}. -handle_call({report_queue_duration, Pid, QueueDuration}, From, +handle_call({report_ram_duration, Pid, QueueDuration}, From, State = #state { queue_duration_sum = Sum, queue_duration_count = Count, queue_durations = Durations, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 340f308f55..6d5ab2f0af 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -60,6 +60,7 @@ -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). -export([recursive_delete/1, dict_cons/3, unlink_and_capture_exit/1]). +-export([geometric/1]). -import(mnesia). -import(lists). @@ -129,14 +130,18 @@ -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). --spec(ceil/1 :: (number()) -> number()). +-spec(ceil/1 :: (number()) -> integer()). -spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B). -spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()). -spec(pid_to_string/1 :: (pid()) -> string()). -spec(string_to_pid/1 :: (string()) -> pid()). +-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt'). +-spec(version_compare/3 :: (string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) -> + boolean()). -spec(recursive_delete/1 :: (string()) -> 'ok' | {'error', any()}). -spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). -spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok'). +-spec(geometric/1 :: (float()) -> non_neg_integer()). -endif. @@ -636,3 +641,7 @@ unlink_and_capture_exit(Pid) -> receive {'EXIT', Pid, _} -> ok after 0 -> ok end. + +geometric(P) when 0.0 < P andalso P < 1.0 -> + U = 1.0 - random:uniform(), + ceil(math:log(U) / math:log(1.0 - P)). diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index 267cb633d8..2c7ea89302 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -46,7 +46,7 @@ %%---------------------------------------------------------------------------- --include("rabbit.hrl"). +-include("rabbit_msg_store.hrl"). -ifdef(use_specs). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index b2db0ea51a..2af16bc1cc 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1444,7 +1444,7 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, file_summary_ets = FileSummaryEts }) when (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION -> First = ets:first(FileSummaryEts), - N = random_distributions:geometric(?GEOMETRIC_P), + N = rabbit_misc:geometric(?GEOMETRIC_P), case find_files_to_gc(FileSummaryEts, N, First) of undefined -> State; diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index f5f49cf4f4..f7f265afe2 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -35,7 +35,7 @@ write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1, read_segment_entries/2, next_segment_boundary/1, segment_size/0, find_lowest_seq_id_seg_and_next_seq_id/1, - start_persistent_msg_store/1]). + start_msg_stores/1]). -export([queue_index_walker_reader/3]). %% for internal use only @@ -172,6 +172,7 @@ }). -include("rabbit.hrl"). +-include("rabbit_variable_queue.hrl"). %%---------------------------------------------------------------------------- @@ -210,7 +211,7 @@ -spec(segment_size/0 :: () -> non_neg_integer()). -spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). --spec(start_persistent_msg_store/1 :: ([amqqueue()]) -> 'ok'). +-spec(start_msg_stores/1 :: ([amqqueue()]) -> 'ok'). -endif. @@ -427,7 +428,12 @@ find_lowest_seq_id_seg_and_next_seq_id(State) -> end, {LowSeqIdSeg, NextSeqId, State}. -start_persistent_msg_store(DurableQueues) -> +start_msg_stores(DurableQueues) -> + ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()), + ok = rabbit_sup:start_child( + ?TRANSIENT_MSG_STORE, rabbit_msg_store, + [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), undefined, + fun (ok) -> finished end, ok]), DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue #amqqueue.name), Queue #amqqueue.name} || Queue <- DurableQueues ]), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a97730e0b1..29699829dc 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -41,6 +41,7 @@ -import(lists). -include("rabbit.hrl"). +-include("rabbit_variable_queue.hrl"). -include_lib("kernel/include/file.hrl"). test_content_prop_roundtrip(Datum, Binary) -> @@ -1203,8 +1204,7 @@ test_amqqueue(Durable) -> pid = none}. empty_test_queue() -> - ok = start_transient_msg_store(), - ok = rabbit_queue_index:start_persistent_msg_store([]), + ok = rabbit_queue_index:start_msg_stores([]), {0, _PRef, _TRef, _Terms, Qi1} = rabbit_queue_index:init(test_queue(), false), _Qi2 = rabbit_queue_index:terminate_and_erase(Qi1), ok. @@ -1266,8 +1266,7 @@ test_queue_index() -> %% call terminate twice to prove it's idempotent _Qi5 = rabbit_queue_index:terminate([], rabbit_queue_index:terminate([], Qi4)), ok = stop_msg_store(), - ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]), - ok = start_transient_msg_store(), + ok = rabbit_queue_index:start_msg_stores([test_amqqueue(true)]), %% should get length back as 0, as all the msgs were transient {0, _PRef1, _TRef1, _Terms1, Qi6} = rabbit_queue_index:init(test_queue(), false), {0, 0, Qi7} = @@ -1280,8 +1279,7 @@ test_queue_index() -> lists:reverse(SeqIdsMsgIdsB)), _Qi11 = rabbit_queue_index:terminate([], Qi10), ok = stop_msg_store(), - ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]), - ok = start_transient_msg_store(), + ok = rabbit_queue_index:start_msg_stores([test_amqqueue(true)]), %% should get length back as 10000 LenB = length(SeqIdsB), {LenB, _PRef2, _TRef2, _Terms2, Qi12} = rabbit_queue_index:init(test_queue(), false), @@ -1298,8 +1296,7 @@ test_queue_index() -> rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17), _Qi19 = rabbit_queue_index:terminate([], Qi18), ok = stop_msg_store(), - ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]), - ok = start_transient_msg_store(), + ok = rabbit_queue_index:start_msg_stores([test_amqqueue(true)]), %% should get length back as 0 because all persistent msgs have been acked {0, _PRef3, _TRef3, _Terms3, Qi20} = rabbit_queue_index:init(test_queue(), false), _Qi21 = rabbit_queue_index:terminate_and_erase(Qi20), @@ -1340,8 +1337,7 @@ test_queue_index() -> Qi40 = queue_index_flush_journal(Qi39), _Qi41 = rabbit_queue_index:terminate_and_erase(Qi40), ok = stop_msg_store(), - ok = rabbit_queue_index:start_persistent_msg_store([]), - ok = start_transient_msg_store(), + ok = rabbit_queue_index:start_msg_stores([]), ok = stop_msg_store(), passed. @@ -1370,7 +1366,7 @@ assert_prop(List, Prop, Value) -> fresh_variable_queue() -> stop_msg_store(), ok = empty_test_queue(), - VQ = rabbit_variable_queue:init(test_queue(), ?PERSISTENT_MSG_STORE), + VQ = rabbit_variable_queue:init(test_queue(), true), S0 = rabbit_variable_queue:status(VQ), assert_prop(S0, len, 0), assert_prop(S0, q1, 0), @@ -1391,7 +1387,7 @@ test_variable_queue_dynamic_duration_change() -> %% start by sending in a couple of segments worth Len1 = 2*SegmentSize, VQ1 = variable_queue_publish(false, Len1, VQ0), - VQ2 = rabbit_variable_queue:remeasure_rates(VQ1), + VQ2 = rabbit_variable_queue:update_ram_duration(VQ1), {ok, _TRef} = timer:send_after(1000, {duration, 60, fun (V) -> (V*0.75)-1 end}), VQ3 = test_variable_queue_dynamic_duration_change_f(Len1, VQ2), @@ -1427,9 +1423,9 @@ test_variable_queue_dynamic_duration_change_f(Len, VQ0) -> _ -> Fun end, {ok, _TRef} = timer:send_after(1000, {duration, N1, Fun1}), - VQ4 = rabbit_variable_queue:remeasure_rates(VQ3), + VQ4 = rabbit_variable_queue:update_ram_duration(VQ3), VQ5 = %% /37 otherwise the duration is just to high to stress things - rabbit_variable_queue:set_queue_duration_target(N/37, VQ4), + rabbit_variable_queue:set_ram_duration_target(N/37, VQ4), io:format("~p:~n~p~n~n", [N, rabbit_variable_queue:status(VQ5)]), test_variable_queue_dynamic_duration_change_f(Len, VQ5) after 0 -> @@ -1441,8 +1437,8 @@ test_variable_queue_partial_segments_delta_thing() -> HalfSegment = SegmentSize div 2, VQ0 = fresh_variable_queue(), VQ1 = variable_queue_publish(true, SegmentSize + HalfSegment, VQ0), - VQ2 = rabbit_variable_queue:remeasure_rates(VQ1), - VQ3 = rabbit_variable_queue:set_queue_duration_target(0, VQ2), + VQ2 = rabbit_variable_queue:update_ram_duration(VQ1), + VQ3 = rabbit_variable_queue:set_ram_duration_target(0, VQ2), %% one segment in q3 as betas, and half a segment in delta S3 = rabbit_variable_queue:status(VQ3), io:format("~p~n", [S3]), @@ -1450,7 +1446,7 @@ test_variable_queue_partial_segments_delta_thing() -> SegmentSize + HalfSegment}), assert_prop(S3, q3, SegmentSize), assert_prop(S3, len, SegmentSize + HalfSegment), - VQ4 = rabbit_variable_queue:set_queue_duration_target(infinity, VQ3), + VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(true, 1, VQ4), %% should have 1 alpha, but it's in the same segment as the deltas S5 = rabbit_variable_queue:status(VQ5), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c01ab5a450..b798a2c9c4 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -32,8 +32,8 @@ -module(rabbit_variable_queue). -export([init/2, terminate/1, publish/2, publish_delivered/2, - set_queue_duration_target/2, remeasure_rates/1, - queue_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1, + set_ram_duration_target/2, update_ram_duration/1, + ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1, delete_and_terminate/1, requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, needs_sync/1, handle_pre_hibernate/1, status/1]). @@ -133,7 +133,7 @@ %%---------------------------------------------------------------------------- --behaviour(rabbit_internal_queue_type). +-behaviour(rabbit_backing_queue_type). -record(vqstate, { q1, @@ -189,6 +189,7 @@ -define(RAM_INDEX_BATCH_SIZE, 64). -include("rabbit.hrl"). +-include("rabbit_variable_queue.hrl"). %%---------------------------------------------------------------------------- @@ -236,7 +237,7 @@ {boolean(), state()}). -spec(tx_commit_index/1 :: (state()) -> {boolean(), state()}). --include("rabbit_internal_queue_type_spec.hrl"). +-include("rabbit_backing_queue_type_spec.hrl"). -endif. @@ -251,7 +252,11 @@ %% Public API %%---------------------------------------------------------------------------- -init(QueueName, PersistentStore) -> +init(QueueName, IsDurable) -> + PersistentStore = case IsDurable of + true -> ?PERSISTENT_MSG_STORE; + false -> ?TRANSIENT_MSG_STORE + end, MsgStoreRecovered = rabbit_msg_store:successfully_recovered_state(PersistentStore), {DeltaCount, PRef, TRef, Terms, IndexState} = @@ -344,7 +349,7 @@ publish_delivered(Msg = #basic_message { guid = MsgId, {ack_not_on_disk, State2} end. -set_queue_duration_target( +set_ram_duration_target( DurationTarget, State = #vqstate { avg_egress_rate = AvgEgressRate, avg_ingress_rate = AvgIngressRate, target_ram_msg_count = TargetRamMsgCount @@ -364,18 +369,18 @@ set_queue_duration_target( false -> reduce_memory_use(State1) end. -remeasure_rates(State = #vqstate { egress_rate = Egress, - ingress_rate = Ingress, - rate_timestamp = Timestamp, - in_counter = InCount, - out_counter = OutCount, - ram_msg_count = RamMsgCount, - duration_target = DurationTarget }) -> +update_ram_duration(State = #vqstate { egress_rate = Egress, + ingress_rate = Ingress, + rate_timestamp = Timestamp, + in_counter = InCount, + out_counter = OutCount, + ram_msg_count = RamMsgCount, + duration_target = DurationTarget }) -> Now = now(), {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), - set_queue_duration_target( + set_ram_duration_target( DurationTarget, State #vqstate { egress_rate = Egress1, avg_egress_rate = AvgEgressRate, @@ -385,7 +390,7 @@ remeasure_rates(State = #vqstate { egress_rate = Egress, ram_msg_count_prev = RamMsgCount, out_counter = 0, in_counter = 0 }). -queue_duration(#vqstate { avg_egress_rate = AvgEgressRate, +ram_duration(#vqstate { avg_egress_rate = AvgEgressRate, avg_ingress_rate = AvgIngressRate, ram_msg_count = RamMsgCount, ram_msg_count_prev = RamMsgCountPrev }) -> @@ -594,7 +599,7 @@ tx_commit(Pubs, AckTags, From, State = Self = self(), ok = rabbit_msg_store:sync( ?PERSISTENT_MSG_STORE, PersistentMsgIds, - fun () -> ok = rabbit_amqqueue:maybe_run_queue_via_internal_queue( + fun () -> ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue( Self, tx_commit_post_msg_store, [IsTransientPubs, Pubs, AckTags, From]) end), diff --git a/src/random_distributions.erl b/src/random_distributions.erl deleted file mode 100644 index 0f7d115cac..0000000000 --- a/src/random_distributions.erl +++ /dev/null @@ -1,38 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(random_distributions). - --export([geometric/1]). - -geometric(P) when 0.0 < P andalso P < 1.0 -> - U = 1.0 - random:uniform(), - rabbit_misc:ceil(math:log(U) / math:log(1.0 - P)). |