summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-09 15:37:12 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-09 15:37:12 +0100
commitdee56eb6e77d4fd719b1dfabdf3dbeaf6e1bed39 (patch)
tree058cc2edb4e226459d80475b3fd437c02f79333e /src
parent2829795d6ed0857892cc23c161fcade87fd114cf (diff)
downloadrabbitmq-server-git-dee56eb6e77d4fd719b1dfabdf3dbeaf6e1bed39.tar.gz
All sorts of tidying, cosmetics, reorganisation and pruning. A veritable smörgåsbord of improvements.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl53
-rw-r--r--src/rabbit_amqqueue_process.erl278
-rw-r--r--src/rabbit_backing_queue_type.erl (renamed from src/rabbit_internal_queue_type.erl)39
-rw-r--r--src/rabbit_memory_monitor.erl10
-rw-r--r--src/rabbit_misc.erl11
-rw-r--r--src/rabbit_msg_file.erl2
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--src/rabbit_queue_index.erl12
-rw-r--r--src/rabbit_tests.erl30
-rw-r--r--src/rabbit_variable_queue.erl37
-rw-r--r--src/random_distributions.erl38
11 files changed, 254 insertions, 258 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 00407824d8..235b1edbc7 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -32,8 +32,8 @@
-module(rabbit_amqqueue).
-export([start/0, declare/4, delete/3, purge/1]).
--export([internal_declare/2, internal_delete/1, remeasure_rates/1,
- set_queue_duration/2, set_maximum_since_use/2]).
+-export([internal_declare/2, internal_delete/1, update_ram_duration/1,
+ set_ram_duration_target/2, set_maximum_since_use/2]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
@@ -41,7 +41,7 @@
-export([consumers/1, consumers_all/1]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
--export([notify_sent/2, unblock/2, maybe_run_queue_via_internal_queue/3,
+-export([notify_sent/2, unblock/2, maybe_run_queue_via_backing_queue/3,
flush_all/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -109,12 +109,12 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(maybe_run_queue_via_internal_queue/3 :: (pid(), atom(), [any()]) -> 'ok').
+-spec(maybe_run_queue_via_backing_queue/3 :: (pid(), atom(), [any()]) -> 'ok').
-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
--spec(remeasure_rates/1 :: (pid()) -> 'ok').
--spec(set_queue_duration/2 :: (pid(), number()) -> 'ok').
+-spec(update_ram_duration/1 :: (pid()) -> 'ok').
+-spec(set_ram_duration_target/2 :: (pid(), number()) -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -124,13 +124,8 @@
%%----------------------------------------------------------------------------
start() ->
- ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()),
- ok = rabbit_sup:start_child(
- ?TRANSIENT_MSG_STORE, rabbit_msg_store,
- [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), undefined,
- fun (ok) -> finished end, ok]),
DurableQueues = find_durable_queues(),
- ok = rabbit_queue_index:start_persistent_msg_store(DurableQueues),
+ ok = rabbit_queue_index:start_msg_stores(DurableQueues),
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup,
@@ -152,7 +147,7 @@ find_durable_queues() ->
recover_durable_queues(DurableQueues) ->
Qs = lists:foldl(
fun (RecoveredQ, Acc) ->
- Q = start_queue_process(RecoveredQ),
+ Q = start_queue_process(RecoveredQ, false),
%% We need to catch the case where a client
%% connected to another node has deleted the queue
%% (and possibly re-created it).
@@ -166,16 +161,14 @@ recover_durable_queues(DurableQueues) ->
[] -> false
end
end) of
- true ->
- ok = gen_server2:call(Q#amqqueue.pid,
- init_internal_queue,
- infinity),
- [Q|Acc];
+ true -> [Q|Acc];
false -> exit(Q#amqqueue.pid, shutdown),
Acc
end
end, [], DurableQueues),
- [ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs],
+ %% Issue inits to *all* the queues so that they all init at the same time
+ [ok = gen_server2:cast(Q#amqqueue.pid, init_backing_queue) || Q <- Qs],
+ [ok = gen_server2:call(Q#amqqueue.pid, sync) || Q <- Qs],
Qs.
declare(QueueName, Durable, AutoDelete, Args) ->
@@ -183,7 +176,7 @@ declare(QueueName, Durable, AutoDelete, Args) ->
durable = Durable,
auto_delete = AutoDelete,
arguments = Args,
- pid = none}),
+ pid = none}, true),
internal_declare(Q, true).
internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
@@ -198,9 +191,6 @@ internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
true -> add_default_binding(Q);
false -> ok
end,
- ok = gen_server2:call(
- Q#amqqueue.pid,
- init_internal_queue, infinity),
Q;
[_] -> not_found %% existing Q on stopped node
end;
@@ -223,8 +213,9 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
-start_queue_process(Q) ->
- {ok, Pid} = supervisor2:start_child(rabbit_amqqueue_sup, [Q]),
+start_queue_process(Q, InitBackingQueue) ->
+ {ok, Pid} =
+ supervisor2:start_child(rabbit_amqqueue_sup, [Q, InitBackingQueue]),
Q#amqqueue{pid = Pid}.
add_default_binding(#amqqueue{name = QueueName}) ->
@@ -358,8 +349,8 @@ notify_sent(QPid, ChPid) ->
unblock(QPid, ChPid) ->
gen_server2:pcast(QPid, 7, {unblock, ChPid}).
-maybe_run_queue_via_internal_queue(QPid, Fun, Args) ->
- gen_server2:pcast(QPid, 7, {maybe_run_queue_via_internal_queue, Fun, Args}).
+maybe_run_queue_via_backing_queue(QPid, Fun, Args) ->
+ gen_server2:pcast(QPid, 7, {maybe_run_queue_via_backing_queue, Fun, Args}).
flush_all(QPids, ChPid) ->
safe_pmap_ok(
@@ -388,11 +379,11 @@ internal_delete(QueueName) ->
ok
end.
-remeasure_rates(QPid) ->
- gen_server2:pcast(QPid, 8, remeasure_rates).
+update_ram_duration(QPid) ->
+ gen_server2:pcast(QPid, 8, update_ram_duration).
-set_queue_duration(QPid, Duration) ->
- gen_server2:pcast(QPid, 8, {set_queue_duration, Duration}).
+set_ram_duration_target(QPid, Duration) ->
+ gen_server2:pcast(QPid, 8, {set_ram_duration_target, Duration}).
set_maximum_since_use(QPid, Age) ->
gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 94e8662de6..a20cd6c3b1 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -37,9 +37,9 @@
-define(UNSENT_MESSAGE_LIMIT, 100).
-define(SYNC_INTERVAL, 5). %% milliseconds
--define(RATES_REMEASURE_INTERVAL, 5000).
+-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
--export([start_link/1, info_keys/0]).
+-export([start_link/2, info_keys/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1]).
@@ -53,9 +53,9 @@
owner,
exclusive_consumer,
has_had_consumers,
- internal_queue,
- internal_queue_state,
- internal_queue_timeout_fun,
+ backing_queue,
+ backing_queue_state,
+ backing_queue_timeout_fun,
next_msg_id,
active_consumers,
blocked_consumers,
@@ -94,34 +94,34 @@
consumers,
transactions,
memory,
- internal_queue_status
+ backing_queue_status
]).
%%----------------------------------------------------------------------------
-start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
+start_link(Q, InitBackingQueue) ->
+ gen_server2:start_link(?MODULE, [Q, InitBackingQueue], []).
info_keys() -> ?INFO_KEYS.
%%----------------------------------------------------------------------------
-init(Q) ->
+init([Q, InitBQ]) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [self()]),
ok = rabbit_memory_monitor:register
- (self(), {rabbit_amqqueue, set_queue_duration, [self()]}),
- {ok, InternalQueueModule} =
- application:get_env(queue_internal_queue_module),
+ (self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}),
+ {ok, BQ} = application:get_env(backing_queue_module),
{ok, #q{q = Q,
owner = none,
exclusive_consumer = none,
has_had_consumers = false,
- internal_queue = InternalQueueModule,
- internal_queue_state = undefined,
- internal_queue_timeout_fun = undefined,
+ backing_queue = BQ,
+ backing_queue_state = maybe_init_backing_queue(InitBQ, BQ, Q),
+ backing_queue_timeout_fun = undefined,
next_msg_id = 1,
active_consumers = queue:new(),
blocked_consumers = queue:new(),
@@ -129,33 +129,39 @@ init(Q) ->
rate_timer_ref = undefined}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-terminate(shutdown, #q{internal_queue_state = IQS,
- internal_queue = IQ}) ->
+maybe_init_backing_queue(
+ true, BQ, #amqqueue{name = QName, durable = IsDurable}) ->
+ BQ:init(QName, IsDurable);
+maybe_init_backing_queue(false, _BQ, _Q) ->
+ undefined.
+
+terminate(shutdown, #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
ok = rabbit_memory_monitor:deregister(self()),
- case IQS of
+ case BQS of
undefined -> ok;
- _ -> IQ:terminate(IQS)
+ _ -> BQ:terminate(BQS)
end;
-terminate({shutdown, _}, #q{internal_queue_state = IQS,
- internal_queue = IQ}) ->
+terminate({shutdown, _}, #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
ok = rabbit_memory_monitor:deregister(self()),
- case IQS of
+ case BQS of
undefined -> ok;
- _ -> IQ:terminate(IQS)
+ _ -> BQ:terminate(BQS)
end;
-terminate(_Reason, State = #q{internal_queue_state = IQS,
- internal_queue = IQ}) ->
+terminate(_Reason, State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
ok = rabbit_memory_monitor:deregister(self()),
%% FIXME: How do we cancel active subscriptions?
%% Ensure that any persisted tx messages are removed.
%% TODO: wait for all in flight tx_commits to complete
- case IQS of
+ case BQS of
undefined ->
ok;
_ ->
- IQS1 = IQ:tx_rollback(
+ BQS1 = BQ:tx_rollback(
lists:concat([PM || #tx { pending_messages = PM } <-
- all_tx_record()]), IQS),
+ all_tx_record()]), BQS),
%% Delete from disk first. If we crash at this point, when
%% a durable queue, we will be recreated at startup,
%% possibly with partial content. The alternative is much
@@ -163,7 +169,7 @@ terminate(_Reason, State = #q{internal_queue_state = IQS,
%% would then have a race between the disk delete and a
%% new queue with the same name being created and
%% published to.
- IQ:delete_and_terminate(IQS1)
+ BQ:delete_and_terminate(BQS1)
end,
ok = rabbit_amqqueue:internal_delete(qname(State)).
@@ -182,9 +188,9 @@ noreply(NewState) ->
{NewState1, Timeout} = next_state(NewState),
{noreply, NewState1, Timeout}.
-next_state(State = #q{internal_queue_state = IQS,
- internal_queue = IQ}) ->
- next_state1(ensure_rate_timer(State), IQ:needs_sync(IQS)).
+next_state(State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ next_state1(ensure_rate_timer(State), BQ:needs_sync(BQS)).
next_state1(State = #q{sync_timer_ref = undefined}, Callback = {_Fun, _Args}) ->
{start_sync_timer(State, Callback), 0};
@@ -193,11 +199,11 @@ next_state1(State, {_Fun, _Args}) ->
next_state1(State = #q{sync_timer_ref = undefined}, undefined) ->
{State, hibernate};
next_state1(State, undefined) ->
- {stop_sync_timer(State#q{internal_queue_timeout_fun = undefined}), hibernate}.
+ {stop_sync_timer(State#q{backing_queue_timeout_fun = undefined}), hibernate}.
ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
- {ok, TRef} = timer:apply_after(?RATES_REMEASURE_INTERVAL, rabbit_amqqueue,
- remeasure_rates, [self()]),
+ {ok, TRef} = timer:apply_after(?RAM_DURATION_UPDATE_INTERVAL, rabbit_amqqueue,
+ update_ram_duration, [self()]),
State#q{rate_timer_ref = TRef};
ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
State#q{rate_timer_ref = undefined};
@@ -216,16 +222,16 @@ start_sync_timer(State = #q{sync_timer_ref = undefined},
Callback = {Fun, Args}) ->
{ok, TRef} = timer:apply_after(
?SYNC_INTERVAL, rabbit_amqqueue,
- maybe_run_queue_via_internal_queue, [self(), Fun, Args]),
- State#q{sync_timer_ref = TRef, internal_queue_timeout_fun = Callback}.
+ maybe_run_queue_via_backing_queue, [self(), Fun, Args]),
+ State#q{sync_timer_ref = TRef, backing_queue_timeout_fun = Callback}.
stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
- State#q{sync_timer_ref = undefined, internal_queue_timeout_fun = undefined}.
+ State#q{sync_timer_ref = undefined, backing_queue_timeout_fun = undefined}.
-assert_invariant(#q{active_consumers = AC, internal_queue_state = IQS,
- internal_queue = IQ}) ->
- true = (queue:is_empty(AC) orelse IQ:is_empty(IQS)).
+assert_invariant(#q{active_consumers = AC, backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -340,73 +346,73 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) ->
not IsEmpty.
deliver_from_queue_deliver(AckRequired, {false, AutoAcks},
- State = #q{internal_queue_state = IQS,
- internal_queue = IQ}) ->
- {{Message, IsDelivered, AckTag, Remaining}, IQS1} = IQ:fetch(IQS),
+ State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ {{Message, IsDelivered, AckTag, Remaining}, BQS1} = BQ:fetch(BQS),
AutoAcks1 = case AckRequired of
true -> AutoAcks;
false -> [AckTag | AutoAcks]
end,
{{Message, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1},
- State #q { internal_queue_state = IQS1 }}.
+ State #q { backing_queue_state = BQS1 }}.
-run_message_queue(State = #q{internal_queue_state = IQS,
- internal_queue = IQ}) ->
+run_message_queue(State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
Funs = { fun deliver_from_queue_pred/2,
fun deliver_from_queue_deliver/3 },
- IsEmpty = IQ:is_empty(IQS),
+ IsEmpty = BQ:is_empty(BQS),
{{_IsEmpty1, AutoAcks}, State1} =
deliver_msgs_to_consumers(Funs, {IsEmpty, []}, State),
- IQS1 = IQ:ack(AutoAcks, State1 #q.internal_queue_state),
- State1 #q { internal_queue_state = IQS1 }.
+ BQS1 = BQ:ack(AutoAcks, State1 #q.backing_queue_state),
+ State1 #q { backing_queue_state = BQS1 }.
-attempt_delivery(none, _ChPid, Message, State = #q{internal_queue = IQ}) ->
+attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
fun (AckRequired, false, State1) ->
{AckTag, State2} =
case AckRequired of
true ->
- {AckTag1, IQS} =
- IQ:publish_delivered(
- Message, State1 #q.internal_queue_state),
- {AckTag1, State1 #q { internal_queue_state = IQS }};
+ {AckTag1, BQS} =
+ BQ:publish_delivered(
+ Message, State1 #q.backing_queue_state),
+ {AckTag1, State1 #q { backing_queue_state = BQS }};
false ->
{noack, State1}
end,
{{Message, false, AckTag}, true, State2}
end,
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
-attempt_delivery(Txn, ChPid, Message, State = #q{internal_queue = IQ}) ->
- IQS = IQ:tx_publish(Message, State #q.internal_queue_state),
+attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
+ BQS = BQ:tx_publish(Message, State #q.backing_queue_state),
record_pending_message(Txn, ChPid, Message),
- {true, State #q { internal_queue_state = IQS }}.
+ {true, State #q { backing_queue_state = BQS }}.
-deliver_or_enqueue(Txn, ChPid, Message, State = #q{internal_queue = IQ}) ->
+deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
case attempt_delivery(Txn, ChPid, Message, State) of
{true, NewState} ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
- IQS = IQ:publish(Message, State #q.internal_queue_state),
- {false, NewState #q { internal_queue_state = IQS }}
+ BQS = BQ:publish(Message, State #q.backing_queue_state),
+ {false, NewState #q { backing_queue_state = BQS }}
end.
%% all these messages have already been delivered at least once and
%% not ack'd, but need to be either redelivered or requeued
deliver_or_requeue_n([], State) ->
State;
-deliver_or_requeue_n(MsgsWithAcks, State = #q{internal_queue = IQ}) ->
+deliver_or_requeue_n(MsgsWithAcks, State = #q{backing_queue = BQ}) ->
Funs = { fun deliver_or_requeue_msgs_pred/2,
fun deliver_or_requeue_msgs_deliver/3 },
{{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} =
deliver_msgs_to_consumers(
Funs, {length(MsgsWithAcks), [], MsgsWithAcks}, State),
- IQS = IQ:ack(AutoAcks, NewState #q.internal_queue_state),
+ BQS = BQ:ack(AutoAcks, NewState #q.backing_queue_state),
case OutstandingMsgs of
- [] -> NewState #q { internal_queue_state = IQS };
- _ -> IQS1 = IQ:requeue(OutstandingMsgs, IQS),
- NewState #q { internal_queue_state = IQS1 }
+ [] -> NewState #q { backing_queue_state = BQS };
+ _ -> BQS1 = BQ:requeue(OutstandingMsgs, BQS),
+ NewState #q { backing_queue_state = BQS1 }
end.
deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) ->
@@ -518,11 +524,11 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
-maybe_run_queue_via_internal_queue(Fun, Args,
- State = #q{internal_queue_state = IQS,
- internal_queue = IQ}) ->
- {RunQueue, IQS1} = apply(IQ, Fun, Args ++ [IQS]),
- State1 = State#q{internal_queue_state = IQS1},
+maybe_run_queue_via_backing_queue(Fun, Args,
+ State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ {RunQueue, BQS1} = apply(BQ, Fun, Args ++ [BQS]),
+ State1 = State#q{backing_queue_state = BQS1},
case RunQueue of
true -> run_message_queue(State1);
false -> State1
@@ -557,7 +563,7 @@ record_pending_acks(Txn, ChPid, MsgIds) ->
store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending],
ch_pid = ChPid}).
-commit_transaction(Txn, From, State = #q{internal_queue = IQ}) ->
+commit_transaction(Txn, From, State = #q{backing_queue = BQ}) ->
#tx{ch_pid = ChPid, pending_messages = PendingMessages,
pending_acks = PendingAcks} = lookup_tx(Txn),
PendingMessagesOrdered = lists:reverse(PendingMessages),
@@ -572,16 +578,16 @@ commit_transaction(Txn, From, State = #q{internal_queue = IQ}) ->
store_ch_record(C#cr{unacked_messages = Remaining}),
[AckTag || {_Message, AckTag} <- MsgsWithAcks]
end,
- {RunQueue, IQS} = IQ:tx_commit(PendingMessagesOrdered, Acks, From,
- State#q.internal_queue_state),
+ {RunQueue, BQS} = BQ:tx_commit(PendingMessagesOrdered, Acks, From,
+ State#q.backing_queue_state),
erase_tx(Txn),
- {RunQueue, State#q{internal_queue_state = IQS}}.
+ {RunQueue, State#q{backing_queue_state = BQS}}.
-rollback_transaction(Txn, State = #q{internal_queue = IQ}) ->
+rollback_transaction(Txn, State = #q{backing_queue = BQ}) ->
#tx{pending_messages = PendingMessages} = lookup_tx(Txn),
- IQS = IQ:tx_rollback(PendingMessages, State #q.internal_queue_state),
+ BQS = BQ:tx_rollback(PendingMessages, State #q.backing_queue_state),
erase_tx(Txn),
- State#q{internal_queue_state = IQS}.
+ State#q{backing_queue_state = BQS}.
collect_messages(MsgIds, UAM) ->
lists:mapfoldl(
@@ -608,8 +614,8 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = none}) ->
'';
i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
ConsumerTag;
-i(messages_ready, #q{internal_queue_state = IQS, internal_queue = IQ}) ->
- IQ:len(IQS);
+i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
+ BQ:len(BQS);
i(messages_unacknowledged, _) ->
lists:sum([dict:size(UAM) ||
#cr{unacked_messages = UAM} <- all_ch_record()]);
@@ -630,26 +636,13 @@ i(transactions, _) ->
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
-i(internal_queue_status, #q{internal_queue_state = IQS, internal_queue = IQ}) ->
- IQ:status(IQS);
+i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
+ BQ:status(BQS);
i(Item, _) ->
throw({bad_argument, Item}).
%---------------------------------------------------------------------------
-handle_call(init_internal_queue, From, State =
- #q{internal_queue_state = undefined, internal_queue = IQ,
- q = #amqqueue{name = QName, durable = IsDurable}}) ->
- gen_server2:reply(From, ok),
- PersistentStore = case IsDurable of
- true -> ?PERSISTENT_MSG_STORE;
- false -> ?TRANSIENT_MSG_STORE
- end,
- noreply(State#q{internal_queue_state = IQ:init(QName, PersistentStore)});
-
-handle_call(init_internal_queue, _From, State) ->
- reply(ok, State);
-
handle_call(sync, _From, State) ->
reply(ok, State);
@@ -713,24 +706,24 @@ handle_call({notify_down, ChPid}, _From, State) ->
handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName}, next_msg_id = NextId,
- internal_queue_state = IQS, internal_queue = IQ}) ->
- case IQ:fetch(IQS) of
- {empty, IQS1} -> reply(empty, State #q { internal_queue_state = IQS1 });
- {{Message, IsDelivered, AckTag, Remaining}, IQS1} ->
+ backing_queue_state = BQS, backing_queue = BQ}) ->
+ case BQ:fetch(BQS) of
+ {empty, BQS1} -> reply(empty, State #q { backing_queue_state = BQS1 });
+ {{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
AckRequired = not(NoAck),
- IQS2 =
+ BQS2 =
case AckRequired of
true ->
C = #cr{unacked_messages = UAM} = ch_record(ChPid),
NewUAM = dict:store(NextId, {Message, AckTag}, UAM),
store_ch_record(C#cr{unacked_messages = NewUAM}),
- IQS1;
+ BQS1;
false ->
- IQ:ack([AckTag], IQS1)
+ BQ:ack([AckTag], BQS1)
end,
Msg = {QName, self(), NextId, IsDelivered, Message},
reply({ok, Remaining, Msg},
- State #q { next_msg_id = NextId + 1, internal_queue_state = IQS2 })
+ State #q { next_msg_id = NextId + 1, backing_queue_state = BQS2 })
end;
handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
@@ -810,14 +803,14 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end;
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
- internal_queue_state = IQS,
- internal_queue = IQ,
+ backing_queue_state = BQS,
+ backing_queue = BQ,
active_consumers = ActiveConsumers}) ->
- reply({ok, Name, IQ:len(IQS), queue:len(ActiveConsumers)}, State);
+ reply({ok, Name, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
- State = #q{internal_queue_state = IQS, internal_queue = IQ}) ->
- Length = IQ:len(IQS),
+ State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
+ Length = BQ:len(BQS),
IsEmpty = Length == 0,
IsUnused = is_unused(State),
if
@@ -829,9 +822,9 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
{stop, normal, {ok, Length}, State}
end;
-handle_call(purge, _From, State = #q{internal_queue = IQ}) ->
- {Count, IQS} = IQ:purge(State#q.internal_queue_state),
- reply({ok, Count}, State#q{internal_queue_state = IQS});
+handle_call(purge, _From, State = #q{backing_queue = BQ}) ->
+ {Count, BQS} = BQ:purge(State#q.backing_queue_state),
+ reply({ok, Count}, State#q{backing_queue_state = BQS});
handle_call({claim_queue, ReaderPid}, _From,
State = #q{owner = Owner, exclusive_consumer = Holder}) ->
@@ -856,12 +849,21 @@ handle_call({claim_queue, ReaderPid}, _From,
reply(locked, State)
end.
+
+handle_cast(init_backing_queue, State = #q{backing_queue_state = undefined,
+ backing_queue = BQ, q = Q}) ->
+ noreply(State#q{backing_queue_state =
+ maybe_init_backing_queue(true, BQ, Q)});
+
+handle_cast(init_backing_queue, State) ->
+ noreply(State);
+
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
noreply(NewState);
-handle_cast({ack, Txn, MsgIds, ChPid}, State = #q{internal_queue = IQ}) ->
+handle_cast({ack, Txn, MsgIds, ChPid}, State = #q{backing_queue = BQ}) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
@@ -869,10 +871,10 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State = #q{internal_queue = IQ}) ->
case Txn of
none ->
{MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM),
- IQS = IQ:ack([AckTag || {_Message, AckTag} <- MsgWithAcks],
- State #q.internal_queue_state),
+ BQS = BQ:ack([AckTag || {_Message, AckTag} <- MsgWithAcks],
+ State #q.backing_queue_state),
store_ch_record(C#cr{unacked_messages = Remaining}),
- noreply(State #q { internal_queue_state = IQS });
+ noreply(State #q { backing_queue_state = BQS });
_ ->
record_pending_acks(Txn, ChPid, MsgIds),
noreply(State)
@@ -906,8 +908,8 @@ handle_cast({notify_sent, ChPid}, State) ->
C#cr{unsent_message_count = Count - 1}
end));
-handle_cast({maybe_run_queue_via_internal_queue, Fun, Args}, State) ->
- noreply(maybe_run_queue_via_internal_queue(Fun, Args, State));
+handle_cast({maybe_run_queue_via_backing_queue, Fun, Args}, State) ->
+ noreply(maybe_run_queue_via_backing_queue(Fun, Args, State));
handle_cast({limit, ChPid, LimiterPid}, State) ->
noreply(
@@ -929,21 +931,21 @@ handle_cast({flush, ChPid}, State) ->
ok = rabbit_channel:flushed(ChPid, self()),
noreply(State);
-handle_cast(remeasure_rates, State = #q{internal_queue_state = IQS,
- internal_queue = IQ}) ->
- IQS1 = IQ:remeasure_rates(IQS),
- RamDuration = IQ:queue_duration(IQS1),
+handle_cast(update_ram_duration, State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ BQS1 = BQ:update_ram_duration(BQS),
+ RamDuration = BQ:ram_duration(BQS1),
DesiredDuration =
- rabbit_memory_monitor:report_queue_duration(self(), RamDuration),
- IQS2 = IQ:set_queue_duration_target(DesiredDuration, IQS1),
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
noreply(State#q{rate_timer_ref = just_measured,
- internal_queue_state = IQS2});
+ backing_queue_state = BQS2});
-handle_cast({set_queue_duration, Duration},
- State = #q{internal_queue_state = IQS,
- internal_queue = IQ}) ->
- IQS1 = IQ:set_queue_duration_target(Duration, IQS),
- noreply(State#q{internal_queue_state = IQS1});
+handle_cast({set_ram_duration_target, Duration},
+ State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ BQS1 = BQ:set_ram_duration_target(Duration, BQS),
+ noreply(State#q{backing_queue_state = BQS1});
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -968,12 +970,12 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
{stop, NewState} -> {stop, normal, NewState}
end;
-handle_info(timeout, State = #q{internal_queue_timeout_fun = undefined}) ->
+handle_info(timeout, State = #q{backing_queue_timeout_fun = undefined}) ->
noreply(State);
-handle_info(timeout, State = #q{internal_queue_timeout_fun = {Fun, Args}}) ->
- noreply(maybe_run_queue_via_internal_queue(
- Fun, Args, State#q{internal_queue_timeout_fun = undefined}));
+handle_info(timeout, State = #q{backing_queue_timeout_fun = {Fun, Args}}) ->
+ noreply(maybe_run_queue_via_backing_queue(
+ Fun, Args, State#q{backing_queue_timeout_fun = undefined}));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
@@ -982,11 +984,11 @@ handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
-handle_pre_hibernate(State = #q{internal_queue_state = IQS,
- internal_queue = IQ}) ->
- IQS1 = IQ:handle_pre_hibernate(IQS),
+handle_pre_hibernate(State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ BQS1 = BQ:handle_pre_hibernate(BQS),
%% no activity for a while == 0 egress and ingress rates
DesiredDuration =
- rabbit_memory_monitor:report_queue_duration(self(), infinity),
- IQS2 = IQ:set_queue_duration_target(DesiredDuration, IQS1),
- {hibernate, stop_rate_timer(State#q{internal_queue_state = IQS2})}.
+ rabbit_memory_monitor:report_ram_duration(self(), infinity),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ {hibernate, stop_rate_timer(State#q{backing_queue_state = BQS2})}.
diff --git a/src/rabbit_internal_queue_type.erl b/src/rabbit_backing_queue_type.erl
index 48d9314df2..46299d0207 100644
--- a/src/rabbit_internal_queue_type.erl
+++ b/src/rabbit_backing_queue_type.erl
@@ -29,14 +29,14 @@
%% Contributor(s): ______________________________________.
%%
--module(rabbit_internal_queue_type).
+-module(rabbit_backing_queue_type).
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
[
- %% Called with queue name and the persistent msg_store to
- %% use. Transient store is in ?TRANSIENT_MSG_STORE
+ %% Called with queue name and a boolean to indicate whether or
+ %% not the queue is durable.
{init, 2},
%% Called on queue shutdown when queue isn't being deleted
@@ -58,27 +58,52 @@ behaviour_info(callbacks) ->
%% (i.e. saves the round trip through the internal queue).
{publish_delivered, 2},
+ %% Produce the next message
{fetch, 1},
+ %% Acktags supplied are for messages which can now be forgotten
+ %% about
{ack, 2},
+ %% A publish, but in the context of a transaction.
{tx_publish, 2},
+
+ %% Undo anything which has been done by the tx_publish of the
+ %% indicated messages.
{tx_rollback, 2},
+
+ %% Commit these publishes and acktags. The publishes you will
+ %% have previously seen in calls to tx_publish.
{tx_commit, 4},
%% Reinsert messages into the queue which have already been
%% delivered and were (likely) pending acks.q
{requeue, 2},
+ %% How long is my queue?
{len, 1},
+ %% Is my queue empty?
{is_empty, 1},
- {set_queue_duration_target, 2},
+ %% For the next three functions, the assumption is that you're
+ %% monitoring something like the ingress and egress rates of the
+ %% queue. The RAM duration is thus the length of time represented
+ %% by the messages held in RAM given the current rates. If you
+ %% want to ignore all of this stuff, then do so, and return 0 in
+ %% ram_duration/1.
+
+ %% The target is to have no more messages in RAM than indicated
+ %% by the duration and the current queue rates.
+ {set_ram_duration_target, 2},
- {remeasure_rates, 1},
+ %% Recalculate the duration internally (likely to be just update
+ %% your internal rates).
+ {update_ram_duration, 1},
- {queue_duration, 1},
+ %% Report how many seconds the messages in RAM represent given
+ %% the current rates of the queue.
+ {ram_duration, 1},
%% Can return 'undefined' or a function atom name plus list of
%% arguments to be invoked in the internal queue module as soon
@@ -90,7 +115,7 @@ behaviour_info(callbacks) ->
{handle_pre_hibernate, 1},
%% Exists for debugging purposes, to be able to expose state via
- %% rabbitmqctl list_queues internal_queue_status
+ %% rabbitmqctl list_queues backing_queue_status
{status, 1}
];
behaviour_info(_Other) ->
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index a76600fe15..91e97ffe49 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -40,7 +40,7 @@
-behaviour(gen_server2).
-export([start_link/0, update/0, register/2, deregister/1,
- report_queue_duration/2, stop/0]).
+ report_ram_duration/2, stop/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -90,7 +90,7 @@
-spec(update/0 :: () -> 'ok').
-spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok').
-spec(deregister/1 :: (pid()) -> 'ok').
--spec(report_queue_duration/2 :: (pid(), float() | 'infinity') -> number()).
+-spec(report_ram_duration/2 :: (pid(), float() | 'infinity') -> number()).
-spec(stop/0 :: () -> 'ok').
-endif.
@@ -111,9 +111,9 @@ register(Pid, MFA = {_M, _F, _A}) ->
deregister(Pid) ->
gen_server2:cast(?SERVER, {deregister, Pid}).
-report_queue_duration(Pid, QueueDuration) ->
+report_ram_duration(Pid, QueueDuration) ->
gen_server2:call(?SERVER,
- {report_queue_duration, Pid, QueueDuration}, infinity).
+ {report_ram_duration, Pid, QueueDuration}, infinity).
stop() ->
gen_server2:cast(?SERVER, stop).
@@ -143,7 +143,7 @@ init([]) ->
memory_limit = MemoryLimit,
desired_duration = infinity })}.
-handle_call({report_queue_duration, Pid, QueueDuration}, From,
+handle_call({report_ram_duration, Pid, QueueDuration}, From,
State = #state { queue_duration_sum = Sum,
queue_duration_count = Count,
queue_durations = Durations,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 340f308f55..6d5ab2f0af 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -60,6 +60,7 @@
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
-export([recursive_delete/1, dict_cons/3, unlink_and_capture_exit/1]).
+-export([geometric/1]).
-import(mnesia).
-import(lists).
@@ -129,14 +130,18 @@
-spec(start_applications/1 :: ([atom()]) -> 'ok').
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
--spec(ceil/1 :: (number()) -> number()).
+-spec(ceil/1 :: (number()) -> integer()).
-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
-spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()).
-spec(pid_to_string/1 :: (pid()) -> string()).
-spec(string_to_pid/1 :: (string()) -> pid()).
+-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt').
+-spec(version_compare/3 :: (string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) ->
+ boolean()).
-spec(recursive_delete/1 :: (string()) -> 'ok' | {'error', any()}).
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok').
+-spec(geometric/1 :: (float()) -> non_neg_integer()).
-endif.
@@ -636,3 +641,7 @@ unlink_and_capture_exit(Pid) ->
receive {'EXIT', Pid, _} -> ok
after 0 -> ok
end.
+
+geometric(P) when 0.0 < P andalso P < 1.0 ->
+ U = 1.0 - random:uniform(),
+ ceil(math:log(U) / math:log(1.0 - P)).
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index 267cb633d8..2c7ea89302 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -46,7 +46,7 @@
%%----------------------------------------------------------------------------
--include("rabbit.hrl").
+-include("rabbit_msg_store.hrl").
-ifdef(use_specs).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index b2db0ea51a..2af16bc1cc 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1444,7 +1444,7 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
file_summary_ets = FileSummaryEts })
when (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
First = ets:first(FileSummaryEts),
- N = random_distributions:geometric(?GEOMETRIC_P),
+ N = rabbit_misc:geometric(?GEOMETRIC_P),
case find_files_to_gc(FileSummaryEts, N, First) of
undefined ->
State;
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index f5f49cf4f4..f7f265afe2 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -35,7 +35,7 @@
write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1,
read_segment_entries/2, next_segment_boundary/1, segment_size/0,
find_lowest_seq_id_seg_and_next_seq_id/1,
- start_persistent_msg_store/1]).
+ start_msg_stores/1]).
-export([queue_index_walker_reader/3]). %% for internal use only
@@ -172,6 +172,7 @@
}).
-include("rabbit.hrl").
+-include("rabbit_variable_queue.hrl").
%%----------------------------------------------------------------------------
@@ -210,7 +211,7 @@
-spec(segment_size/0 :: () -> non_neg_integer()).
-spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
--spec(start_persistent_msg_store/1 :: ([amqqueue()]) -> 'ok').
+-spec(start_msg_stores/1 :: ([amqqueue()]) -> 'ok').
-endif.
@@ -427,7 +428,12 @@ find_lowest_seq_id_seg_and_next_seq_id(State) ->
end,
{LowSeqIdSeg, NextSeqId, State}.
-start_persistent_msg_store(DurableQueues) ->
+start_msg_stores(DurableQueues) ->
+ ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()),
+ ok = rabbit_sup:start_child(
+ ?TRANSIENT_MSG_STORE, rabbit_msg_store,
+ [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), undefined,
+ fun (ok) -> finished end, ok]),
DurableDict =
dict:from_list([ {queue_name_to_dir_name(Queue #amqqueue.name),
Queue #amqqueue.name} || Queue <- DurableQueues ]),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index a97730e0b1..29699829dc 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -41,6 +41,7 @@
-import(lists).
-include("rabbit.hrl").
+-include("rabbit_variable_queue.hrl").
-include_lib("kernel/include/file.hrl").
test_content_prop_roundtrip(Datum, Binary) ->
@@ -1203,8 +1204,7 @@ test_amqqueue(Durable) ->
pid = none}.
empty_test_queue() ->
- ok = start_transient_msg_store(),
- ok = rabbit_queue_index:start_persistent_msg_store([]),
+ ok = rabbit_queue_index:start_msg_stores([]),
{0, _PRef, _TRef, _Terms, Qi1} = rabbit_queue_index:init(test_queue(), false),
_Qi2 = rabbit_queue_index:terminate_and_erase(Qi1),
ok.
@@ -1266,8 +1266,7 @@ test_queue_index() ->
%% call terminate twice to prove it's idempotent
_Qi5 = rabbit_queue_index:terminate([], rabbit_queue_index:terminate([], Qi4)),
ok = stop_msg_store(),
- ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]),
- ok = start_transient_msg_store(),
+ ok = rabbit_queue_index:start_msg_stores([test_amqqueue(true)]),
%% should get length back as 0, as all the msgs were transient
{0, _PRef1, _TRef1, _Terms1, Qi6} = rabbit_queue_index:init(test_queue(), false),
{0, 0, Qi7} =
@@ -1280,8 +1279,7 @@ test_queue_index() ->
lists:reverse(SeqIdsMsgIdsB)),
_Qi11 = rabbit_queue_index:terminate([], Qi10),
ok = stop_msg_store(),
- ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]),
- ok = start_transient_msg_store(),
+ ok = rabbit_queue_index:start_msg_stores([test_amqqueue(true)]),
%% should get length back as 10000
LenB = length(SeqIdsB),
{LenB, _PRef2, _TRef2, _Terms2, Qi12} = rabbit_queue_index:init(test_queue(), false),
@@ -1298,8 +1296,7 @@ test_queue_index() ->
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17),
_Qi19 = rabbit_queue_index:terminate([], Qi18),
ok = stop_msg_store(),
- ok = rabbit_queue_index:start_persistent_msg_store([test_amqqueue(true)]),
- ok = start_transient_msg_store(),
+ ok = rabbit_queue_index:start_msg_stores([test_amqqueue(true)]),
%% should get length back as 0 because all persistent msgs have been acked
{0, _PRef3, _TRef3, _Terms3, Qi20} = rabbit_queue_index:init(test_queue(), false),
_Qi21 = rabbit_queue_index:terminate_and_erase(Qi20),
@@ -1340,8 +1337,7 @@ test_queue_index() ->
Qi40 = queue_index_flush_journal(Qi39),
_Qi41 = rabbit_queue_index:terminate_and_erase(Qi40),
ok = stop_msg_store(),
- ok = rabbit_queue_index:start_persistent_msg_store([]),
- ok = start_transient_msg_store(),
+ ok = rabbit_queue_index:start_msg_stores([]),
ok = stop_msg_store(),
passed.
@@ -1370,7 +1366,7 @@ assert_prop(List, Prop, Value) ->
fresh_variable_queue() ->
stop_msg_store(),
ok = empty_test_queue(),
- VQ = rabbit_variable_queue:init(test_queue(), ?PERSISTENT_MSG_STORE),
+ VQ = rabbit_variable_queue:init(test_queue(), true),
S0 = rabbit_variable_queue:status(VQ),
assert_prop(S0, len, 0),
assert_prop(S0, q1, 0),
@@ -1391,7 +1387,7 @@ test_variable_queue_dynamic_duration_change() ->
%% start by sending in a couple of segments worth
Len1 = 2*SegmentSize,
VQ1 = variable_queue_publish(false, Len1, VQ0),
- VQ2 = rabbit_variable_queue:remeasure_rates(VQ1),
+ VQ2 = rabbit_variable_queue:update_ram_duration(VQ1),
{ok, _TRef} = timer:send_after(1000, {duration, 60,
fun (V) -> (V*0.75)-1 end}),
VQ3 = test_variable_queue_dynamic_duration_change_f(Len1, VQ2),
@@ -1427,9 +1423,9 @@ test_variable_queue_dynamic_duration_change_f(Len, VQ0) ->
_ -> Fun
end,
{ok, _TRef} = timer:send_after(1000, {duration, N1, Fun1}),
- VQ4 = rabbit_variable_queue:remeasure_rates(VQ3),
+ VQ4 = rabbit_variable_queue:update_ram_duration(VQ3),
VQ5 = %% /37 otherwise the duration is just to high to stress things
- rabbit_variable_queue:set_queue_duration_target(N/37, VQ4),
+ rabbit_variable_queue:set_ram_duration_target(N/37, VQ4),
io:format("~p:~n~p~n~n", [N, rabbit_variable_queue:status(VQ5)]),
test_variable_queue_dynamic_duration_change_f(Len, VQ5)
after 0 ->
@@ -1441,8 +1437,8 @@ test_variable_queue_partial_segments_delta_thing() ->
HalfSegment = SegmentSize div 2,
VQ0 = fresh_variable_queue(),
VQ1 = variable_queue_publish(true, SegmentSize + HalfSegment, VQ0),
- VQ2 = rabbit_variable_queue:remeasure_rates(VQ1),
- VQ3 = rabbit_variable_queue:set_queue_duration_target(0, VQ2),
+ VQ2 = rabbit_variable_queue:update_ram_duration(VQ1),
+ VQ3 = rabbit_variable_queue:set_ram_duration_target(0, VQ2),
%% one segment in q3 as betas, and half a segment in delta
S3 = rabbit_variable_queue:status(VQ3),
io:format("~p~n", [S3]),
@@ -1450,7 +1446,7 @@ test_variable_queue_partial_segments_delta_thing() ->
SegmentSize + HalfSegment}),
assert_prop(S3, q3, SegmentSize),
assert_prop(S3, len, SegmentSize + HalfSegment),
- VQ4 = rabbit_variable_queue:set_queue_duration_target(infinity, VQ3),
+ VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(true, 1, VQ4),
%% should have 1 alpha, but it's in the same segment as the deltas
S5 = rabbit_variable_queue:status(VQ5),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c01ab5a450..b798a2c9c4 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -32,8 +32,8 @@
-module(rabbit_variable_queue).
-export([init/2, terminate/1, publish/2, publish_delivered/2,
- set_queue_duration_target/2, remeasure_rates/1,
- queue_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1,
+ set_ram_duration_target/2, update_ram_duration/1,
+ ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1,
delete_and_terminate/1, requeue/2, tx_publish/2, tx_rollback/2,
tx_commit/4, needs_sync/1, handle_pre_hibernate/1, status/1]).
@@ -133,7 +133,7 @@
%%----------------------------------------------------------------------------
--behaviour(rabbit_internal_queue_type).
+-behaviour(rabbit_backing_queue_type).
-record(vqstate,
{ q1,
@@ -189,6 +189,7 @@
-define(RAM_INDEX_BATCH_SIZE, 64).
-include("rabbit.hrl").
+-include("rabbit_variable_queue.hrl").
%%----------------------------------------------------------------------------
@@ -236,7 +237,7 @@
{boolean(), state()}).
-spec(tx_commit_index/1 :: (state()) -> {boolean(), state()}).
--include("rabbit_internal_queue_type_spec.hrl").
+-include("rabbit_backing_queue_type_spec.hrl").
-endif.
@@ -251,7 +252,11 @@
%% Public API
%%----------------------------------------------------------------------------
-init(QueueName, PersistentStore) ->
+init(QueueName, IsDurable) ->
+ PersistentStore = case IsDurable of
+ true -> ?PERSISTENT_MSG_STORE;
+ false -> ?TRANSIENT_MSG_STORE
+ end,
MsgStoreRecovered =
rabbit_msg_store:successfully_recovered_state(PersistentStore),
{DeltaCount, PRef, TRef, Terms, IndexState} =
@@ -344,7 +349,7 @@ publish_delivered(Msg = #basic_message { guid = MsgId,
{ack_not_on_disk, State2}
end.
-set_queue_duration_target(
+set_ram_duration_target(
DurationTarget, State = #vqstate { avg_egress_rate = AvgEgressRate,
avg_ingress_rate = AvgIngressRate,
target_ram_msg_count = TargetRamMsgCount
@@ -364,18 +369,18 @@ set_queue_duration_target(
false -> reduce_memory_use(State1)
end.
-remeasure_rates(State = #vqstate { egress_rate = Egress,
- ingress_rate = Ingress,
- rate_timestamp = Timestamp,
- in_counter = InCount,
- out_counter = OutCount,
- ram_msg_count = RamMsgCount,
- duration_target = DurationTarget }) ->
+update_ram_duration(State = #vqstate { egress_rate = Egress,
+ ingress_rate = Ingress,
+ rate_timestamp = Timestamp,
+ in_counter = InCount,
+ out_counter = OutCount,
+ ram_msg_count = RamMsgCount,
+ duration_target = DurationTarget }) ->
Now = now(),
{AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
{AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
- set_queue_duration_target(
+ set_ram_duration_target(
DurationTarget,
State #vqstate { egress_rate = Egress1,
avg_egress_rate = AvgEgressRate,
@@ -385,7 +390,7 @@ remeasure_rates(State = #vqstate { egress_rate = Egress,
ram_msg_count_prev = RamMsgCount,
out_counter = 0, in_counter = 0 }).
-queue_duration(#vqstate { avg_egress_rate = AvgEgressRate,
+ram_duration(#vqstate { avg_egress_rate = AvgEgressRate,
avg_ingress_rate = AvgIngressRate,
ram_msg_count = RamMsgCount,
ram_msg_count_prev = RamMsgCountPrev }) ->
@@ -594,7 +599,7 @@ tx_commit(Pubs, AckTags, From, State =
Self = self(),
ok = rabbit_msg_store:sync(
?PERSISTENT_MSG_STORE, PersistentMsgIds,
- fun () -> ok = rabbit_amqqueue:maybe_run_queue_via_internal_queue(
+ fun () -> ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue(
Self, tx_commit_post_msg_store,
[IsTransientPubs, Pubs, AckTags, From])
end),
diff --git a/src/random_distributions.erl b/src/random_distributions.erl
deleted file mode 100644
index 0f7d115cac..0000000000
--- a/src/random_distributions.erl
+++ /dev/null
@@ -1,38 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(random_distributions).
-
--export([geometric/1]).
-
-geometric(P) when 0.0 < P andalso P < 1.0 ->
- U = 1.0 - random:uniform(),
- rabbit_misc:ceil(math:log(U) / math:log(1.0 - P)).