summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-16 18:08:41 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-16 18:08:41 +0100
commit563b889cabb7a5877fe2b0f18628a56b035976fe (patch)
tree0c275b0bd774ae21dbfcd1888ec94201c7b5b1ce
parentd64d8b81013ff78ca24381d38f5cdfa2e0a76765 (diff)
downloadrabbitmq-server-563b889cabb7a5877fe2b0f18628a56b035976fe.tar.gz
Well it's better. The memory size is now recovered at start up by doing a foldl on the entire queue. This seems excessive, but it does work. It only takes 75 seconds on my machine to get through 1e6 1024-byte messages, and 160 seconds to get through 2e6 1024-byte messages. So that doesn't worry me any more. Also, it's done in constant memory... ish[0].
Also fixed the queue_mode_manager. Registration does not now produce a mode. Instead, it assumes you're starting up in disk only mode and then the first memory_report will result in the correct mode being set. This is safe and prevents a potentially deadly prefetch being sent when a queue starts up in mixed mode only to be sent to disk_only mode. However, the disk_queue has to start up in mixed mode because if it doesn't it has no way to estimate its memory use for disk mode. As such, it registers and then sends a report of 0 memory use. This guarantees that it can be put in mixed mode, thus it can then respond as necessary to the queue_mode_manager. I've not done anything further at this stage with the use of the erlang queue in the mixed_queue module when in disk mode (the potential per-message cost). Really you don't want to send individual entries here to the disk_queue, you want to batch them up... makes this rather more complex. [0] Sort of wrong. It can use the cache, and if you think about not too big queues sharing messages, this is clearly a good thing. But if there are lots of shared messages then it all goes wrong because the cache will get over populated and exhaust memory. Furthermore, the foldl is entirely in the disk_queue process. This means that during the foldl it won't be reporting memory and it won't be able to respond to request to change its mode. All of which points pretty strongly to the requirement that the prefetch needs to be somewhat more sophisticated.
-rw-r--r--src/rabbit_amqqueue_process.erl37
-rw-r--r--src/rabbit_disk_queue.erl40
-rw-r--r--src/rabbit_mixed_queue.erl33
-rw-r--r--src/rabbit_queue_mode_manager.erl34
-rw-r--r--src/rabbit_tests.erl23
5 files changed, 94 insertions, 73 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a1b5a895..0597215f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -100,19 +100,22 @@ start_link(Q) ->
init(Q = #amqqueue { name = QName, durable = Durable }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
- {ok, Mode} = rabbit_queue_mode_manager:register
- (self(), rabbit_amqqueue, set_mode, [self()]),
- {ok, MS} = rabbit_mixed_queue:init(QName, Durable, Mode),
- {ok, #q{q = Q,
- owner = none,
- exclusive_consumer = none,
- has_had_consumers = false,
- mixed_state = MS,
- next_msg_id = 1,
- active_consumers = queue:new(),
- blocked_consumers = queue:new(),
- memory_report_timer = start_memory_timer()
- }, {binary, ?HIBERNATE_AFTER_MIN}}.
+ ok = rabbit_queue_mode_manager:register
+ (self(), rabbit_amqqueue, set_mode, [self()]),
+ {ok, MS} = rabbit_mixed_queue:init(QName, Durable),
+ State = #q{q = Q,
+ owner = none,
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ mixed_state = MS,
+ next_msg_id = 1,
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new(),
+ memory_report_timer = start_memory_timer()
+ },
+ %% first thing we must do is report_memory which will clear out
+ %% the 'undefined' values in gain and loss in mixed_queue state
+ {ok, report_memory(false, State), {binary, ?HIBERNATE_AFTER_MIN}}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -553,14 +556,10 @@ i(memory, _) ->
i(Item, _) ->
throw({bad_argument, Item}).
-report_memory(Hibernating, State = #q { mixed_state = MS }) ->
+report_memory(Hib, State = #q { mixed_state = MS }) ->
{MSize, Gain, Loss} =
rabbit_mixed_queue:estimate_queue_memory(MS),
- NewMem = case MSize of
- 0 -> 1; %% avoid / 0
- N -> N
- end,
- rabbit_queue_mode_manager:report_memory(self(), NewMem, Gain, Loss, Hibernating),
+ rabbit_queue_mode_manager:report_memory(self(), MSize, Gain, Loss, Hib),
State #q { mixed_state = rabbit_mixed_queue:reset_counters(MS) }.
%---------------------------------------------------------------------------
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 8b148777..868eab4a 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -42,7 +42,7 @@
tx_publish/1, tx_commit/3, tx_cancel/1,
requeue/2, purge/1, delete_queue/1,
delete_non_durable_queues/1, auto_ack_next_message/1,
- requeue_next_n/2, prefetch/2, length/1
+ requeue_next_n/2, prefetch/2, length/1, foldl/3
]).
-export([filesync/0, cache_info/0]).
@@ -266,6 +266,9 @@
-spec(delete_queue/1 :: (queue_name()) -> 'ok').
-spec(delete_non_durable_queues/1 :: (set()) -> 'ok').
-spec(length/1 :: (queue_name()) -> non_neg_integer()).
+-spec(foldl/3 :: (fun (({message(), non_neg_integer(),
+ bool(), {msg_id(), seq_id()}}, A) ->
+ A), A, queue_name()) -> A).
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
-spec(to_disk_only_mode/0 :: () -> 'ok').
@@ -328,6 +331,9 @@ delete_non_durable_queues(DurableQueues) ->
length(Q) ->
gen_server2:call(?SERVER, {length, Q}, infinity).
+foldl(Fun, Init, Acc) ->
+ gen_server2:call(?SERVER, {foldl, Fun, Init, Acc}, infinity).
+
stop() ->
gen_server2:call(?SERVER, stop, infinity).
@@ -367,8 +373,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% brutal_kill.
%% Otherwise, the gen_server will be immediately terminated.
process_flag(trap_exit, true),
- {ok, Mode} = rabbit_queue_mode_manager:register
- (self(), rabbit_disk_queue, set_mode, []),
+ ok = rabbit_queue_mode_manager:register
+ (self(), rabbit_disk_queue, set_mode, []),
Node = node(),
ok =
case mnesia:change_table_copy_type(rabbit_disk_queue, Node,
@@ -440,10 +446,13 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
ok = preallocate(FileHdl, FileSizeLimit, Offset)
end,
State2 = State1 #dqstate { current_file_handle = FileHdl },
- {ok, case Mode of
- mixed -> State2;
- disk -> to_disk_only_mode(State2)
- end, {binary, ?HIBERNATE_AFTER_MIN}, 0}.
+ %% by reporting a memory use of 0, we guarantee the manager will
+ %% grant us to ram_disk mode. We have to start in ram_disk mode
+ %% because we can't find values for mnesia_bytes_per_record or
+ %% ets_bytes_per_record otherwise.
+ ok = rabbit_queue_mode_manager:report_memory(self(), 0, false),
+ ok = report_memory(false, State2),
+ {ok, State2, {binary, ?HIBERNATE_AFTER_MIN}, 0}.
handle_call({deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, true, false, State),
@@ -464,6 +473,9 @@ handle_call({purge, Q}, _From, State) ->
handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) ->
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
reply(WriteSeqId - ReadSeqId, State);
+handle_call({foldl, Fun, Init, Q}, _From, State) ->
+ {ok, Result, State1} = internal_foldl(Q, Fun, Init, State),
+ reply(Result, State1);
handle_call(stop, _From, State) ->
{stop, normal, ok, State}; %% gen_server now calls terminate
handle_call(stop_vaporise, _From, State) ->
@@ -588,7 +600,7 @@ start_memory_timer() ->
TRef.
start_memory_timer(State = #dqstate { memory_report_timer = undefined }) ->
- report_memory(false, State),
+ ok = report_memory(false, State),
State #dqstate { memory_report_timer = start_memory_timer() };
start_memory_timer(State) ->
State.
@@ -899,6 +911,18 @@ internal_prefetch(Q, Count, State = #dqstate { sequences = Sequences }) ->
end, State, lists:seq(ReadSeqId, ReadSeqId + Count1 - 1)),
{ok, StateN}.
+internal_foldl(Q, Fun, Init, State = #dqstate { sequences = Sequences }) ->
+ {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
+ internal_foldl(Q, WriteSeqId, Fun, State, Init, ReadSeqId).
+
+internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) ->
+ {ok, Acc, State};
+internal_foldl(Q, WriteSeqId, Fun, State, Acc, ReadSeqId) ->
+ {ok, MsgStuff, State1}
+ = internal_read_message(Q, ReadSeqId, true, true, false, State),
+ Acc1 = Fun(MsgStuff, Acc),
+ internal_foldl(Q, WriteSeqId, Fun, State1, Acc1, ReadSeqId + 1).
+
internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) ->
[Obj =
#dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] =
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index a9013f3d..d864d9b2 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
--export([init/3]).
+-export([init/2]).
-export([publish/2, publish_delivered/2, deliver/1, ack/2,
tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1,
@@ -70,7 +70,7 @@
-type(acktag() :: ( 'noack' | { non_neg_integer(), non_neg_integer() })).
-type(okmqs() :: {'ok', mqstate()}).
--spec(init/3 :: (queue_name(), bool(), mode()) -> okmqs()).
+-spec(init/2 :: (queue_name(), bool()) -> okmqs()).
-spec(publish/2 :: (message(), mqstate()) -> okmqs()).
-spec(publish_delivered/2 :: (message(), mqstate()) ->
{'ok', acktag(), mqstate()}).
@@ -99,16 +99,18 @@
-endif.
-init(Queue, IsDurable, disk) ->
+init(Queue, IsDurable) ->
Len = rabbit_disk_queue:length(Queue),
ok = rabbit_disk_queue:delete_queue(transient_queue(Queue)),
MsgBuf = inc_queue_length(Queue, queue:new(), Len),
+ Size = rabbit_disk_queue:foldl(
+ fun ({Msg, _Size, _IsDelivered, _AckTag}, Acc) ->
+ Acc + size_of_message(Msg)
+ end, 0, Queue),
{ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue,
is_durable = IsDurable, length = Len,
- memory_size = 0, memory_gain = 0, memory_loss = 0 }};
-init(Queue, IsDurable, mixed) ->
- {ok, State} = init(Queue, IsDurable, disk),
- to_mixed_mode([], State).
+ memory_size = Size, memory_gain = undefined,
+ memory_loss = undefined }}.
size_of_message(
#basic_message { content = #content { payload_fragments_rev = Payload }}) ->
@@ -214,7 +216,7 @@ to_mixed_mode(TxnMessages, State =
%% load up a new queue with a token that says how many messages
%% are on disk (this is already built for us by the disk mode)
%% don't actually do anything to the disk
- ok = maybe_prefetch(MsgBuf),
+ ok = maybe_prefetch(mixed, MsgBuf),
%% remove txn messages from disk which are neither persistent and
%% durable. This is necessary to avoid leaks. This is also pretty
%% much the inverse behaviour of our own tx_cancel/2 which is why
@@ -248,10 +250,10 @@ inc_queue_length(Queue, MsgBuf, Count) ->
queue:in({Queue, Count}, MsgBuf)
end.
-dec_queue_length(MsgBuf) ->
+dec_queue_length(Mode, MsgBuf) ->
{{value, {Queue, Len}}, MsgBuf1} = queue:out(MsgBuf),
MsgBuf2 = case Len of
- 1 -> ok = maybe_prefetch(MsgBuf1),
+ 1 -> ok = maybe_prefetch(Mode, MsgBuf1),
MsgBuf1;
_ -> queue:in_r({Queue, Len-1}, MsgBuf1)
end,
@@ -327,7 +329,8 @@ publish_delivered(Msg, State =
deliver(State = #mqstate { length = 0 }) ->
{empty, State};
deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
- is_durable = IsDurable, length = Length }) ->
+ is_durable = IsDurable, length = Length,
+ mode = Mode }) ->
{{value, Value}, MsgBuf1} = queue:out(MsgBuf),
{Msg, IsDelivered, AckTag, MsgBuf2} =
case Value of
@@ -343,10 +346,10 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
false ->
noack
end,
- ok = maybe_prefetch(MsgBuf1),
+ ok = maybe_prefetch(Mode, MsgBuf1),
{Msg1, IsDelivered1, AckTag1, MsgBuf1};
_ ->
- {ReadQ, MsgBuf3} = dec_queue_length(MsgBuf),
+ {ReadQ, MsgBuf3} = dec_queue_length(Mode, MsgBuf),
{Msg1 = #basic_message { is_persistent = IsPersistent },
_Size, IsDelivered1, AckTag1, _PersistRem}
= rabbit_disk_queue:deliver(ReadQ),
@@ -364,7 +367,9 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
{{Msg, IsDelivered, AckTag, Rem},
State #mqstate { msg_buf = MsgBuf2, length = Rem }}.
-maybe_prefetch(MsgBuf) ->
+maybe_prefetch(disk, MsgBuf) ->
+ ok;
+maybe_prefetch(mixed, MsgBuf) ->
case queue:peek(MsgBuf) of
empty ->
ok;
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
index a5e9610a..d4bc21d4 100644
--- a/src/rabbit_queue_mode_manager.erl
+++ b/src/rabbit_queue_mode_manager.erl
@@ -43,7 +43,6 @@
-define(TOTAL_TOKENS, 10000000).
-define(ACTIVITY_THRESHOLD, 25).
--define(INITIAL_TOKEN_ALLOCATION, 100).
-define(SERVER, ?MODULE).
@@ -53,7 +52,7 @@
-spec(start_link/0 :: () ->
({'ok', pid()} | 'ignore' | {'error', any()})).
--spec(register/4 :: (pid(), atom(), atom(), list()) -> {'ok', queue_mode()}).
+-spec(register/4 :: (pid(), atom(), atom(), list()) -> 'ok').
-spec(report_memory/3 :: (pid(), non_neg_integer(), bool()) -> 'ok').
-spec(report_memory/5 :: (pid(), non_neg_integer(),
non_neg_integer(), non_neg_integer(), bool()) ->
@@ -141,7 +140,7 @@ start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
register(Pid, Module, Function, Args) ->
- gen_server2:call(?SERVER, {register, Pid, Module, Function, Args}).
+ gen_server2:cast(?SERVER, {register, Pid, Module, Function, Args}).
pin_to_disk(Pid) ->
gen_server2:call(?SERVER, {pin_to_disk, Pid}).
@@ -173,27 +172,6 @@ init([]) ->
disk_mode_pins = sets:new()
}}.
-handle_call({register, Pid, Module, Function, Args}, _From,
- State = #state { callbacks = Callbacks }) ->
- _MRef = erlang:monitor(process, Pid),
- State1 = State #state { callbacks = dict:store
- (Pid, {Module, Function, Args}, Callbacks) },
- State2 = #state { available_tokens = Avail,
- mixed_queues = Mixed } =
- free_upto(Pid, ?INITIAL_TOKEN_ALLOCATION, State1),
- {Result, State3} =
- case ?INITIAL_TOKEN_ALLOCATION > Avail of
- true ->
- {disk, State2};
- false ->
- {mixed, State2 #state {
- available_tokens =
- Avail - ?INITIAL_TOKEN_ALLOCATION,
- mixed_queues = dict:store
- (Pid, {?INITIAL_TOKEN_ALLOCATION, active}, Mixed) }}
- end,
- {reply, {ok, Result}, State3};
-
handle_call({pin_to_disk, Pid}, _From,
State = #state { mixed_queues = Mixed,
callbacks = Callbacks,
@@ -317,7 +295,13 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
hibernate -> StateN #state { hibernate =
queue:in(Pid, Sleepy) }
end,
- {noreply, StateN1}.
+ {noreply, StateN1};
+
+handle_cast({register, Pid, Module, Function, Args},
+ State = #state { callbacks = Callbacks }) ->
+ _MRef = erlang:monitor(process, Pid),
+ {noreply, State #state { callbacks = dict:store
+ (Pid, {Module, Function, Args}, Callbacks) }}.
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state { available_tokens = Avail,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 221279f7..58a9d0cd 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -990,11 +990,20 @@ rdq_test_purge() ->
rdq_stop(),
passed.
+rdq_new_mixed_queue(Q, Durable, Disk) ->
+ {ok, MS} = rabbit_mixed_queue:init(Q, Durable),
+ MS1 = rabbit_mixed_queue:reset_counters(MS),
+ case Disk of
+ true -> {ok, MS2} = rabbit_mixed_queue:to_disk_only_mode([], MS1),
+ MS2;
+ false -> MS1
+ end.
+
rdq_test_mixed_queue_modes() ->
rdq_virgin(),
rdq_start(),
Payload = <<0:(8*256)>>,
- {ok, MS} = rabbit_mixed_queue:init(q, true, mixed),
+ MS = rdq_new_mixed_queue(q, true, false),
MS2 = lists:foldl(
fun (_N, MS1) ->
Msg = rabbit_basic:message(x, <<>>, [], Payload),
@@ -1041,7 +1050,7 @@ rdq_test_mixed_queue_modes() ->
io:format("Converted to disk only mode~n"),
rdq_stop(),
rdq_start(),
- {ok, MS12} = rabbit_mixed_queue:init(q, true, mixed),
+ MS12 = rdq_new_mixed_queue(q, true, false),
10 = rabbit_mixed_queue:length(MS12),
io:format("Recovered queue~n"),
{MS14, AckTags} =
@@ -1061,7 +1070,7 @@ rdq_test_mixed_queue_modes() ->
io:format("Converted to disk only mode~n"),
rdq_stop(),
rdq_start(),
- {ok, MS17} = rabbit_mixed_queue:init(q, true, mixed),
+ MS17 = rdq_new_mixed_queue(q, true, false),
0 = rabbit_mixed_queue:length(MS17),
{0,0,0} = rabbit_mixed_queue:estimate_queue_memory(MS17),
io:format("Recovered queue~n"),
@@ -1081,23 +1090,23 @@ rdq_test_mode_conversion_mid_txn() ->
rdq_virgin(),
rdq_start(),
- {ok, MS0} = rabbit_mixed_queue:init(q, true, mixed),
+ MS0 = rdq_new_mixed_queue(q, true, false),
passed = rdq_tx_publish_mixed_alter_commit_get(
MS0, MsgsA, MsgsB, fun rabbit_mixed_queue:to_disk_only_mode/2, commit),
rdq_stop_virgin_start(),
- {ok, MS1} = rabbit_mixed_queue:init(q, true, mixed),
+ MS1 = rdq_new_mixed_queue(q, true, false),
passed = rdq_tx_publish_mixed_alter_commit_get(
MS1, MsgsA, MsgsB, fun rabbit_mixed_queue:to_disk_only_mode/2, cancel),
rdq_stop_virgin_start(),
- {ok, MS2} = rabbit_mixed_queue:init(q, true, disk),
+ MS2 = rdq_new_mixed_queue(q, true, true),
passed = rdq_tx_publish_mixed_alter_commit_get(
MS2, MsgsA, MsgsB, fun rabbit_mixed_queue:to_mixed_mode/2, commit),
rdq_stop_virgin_start(),
- {ok, MS3} = rabbit_mixed_queue:init(q, true, disk),
+ MS3 = rdq_new_mixed_queue(q, true, true),
passed = rdq_tx_publish_mixed_alter_commit_get(
MS3, MsgsA, MsgsB, fun rabbit_mixed_queue:to_mixed_mode/2, cancel),