diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-09-23 19:01:31 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-09-23 19:01:31 +0100 |
commit | b0e7ad62a35cd6a42a1dd444f61c1ae2c2b5e720 (patch) | |
tree | 710b0057415c52f5938e2443a130d2939d315351 | |
parent | 21559df7d6eeae794803da2b88dd8eb58c6a6e9a (diff) | |
download | rabbitmq-server-b0e7ad62a35cd6a42a1dd444f61c1ae2c2b5e720.tar.gz |
replace msg_store:attrs with contains
This is a step on the path to getting rid of message attributes in msg_store.
msg_store:attrs was only being used in disk_queue:prune, to detect
when the store contained a non-persistent message and remove that
message from the store and the rabbit_disk_queue table.
Now rabbit_disk_queue records contain an IsPersistent flag. By making
the msg count delta generator pay attention to that flag we trim
non-persistent messages from the store during its initialisation,
disk_queue:prune no longer needs to remove messages from the store, it
just needs to remove all messages from the rabbit_disk_queue table
which are no longer referenced by the store - hence the new
msg_store:contains function.
Keeping the IsPersistent flag in the rabbit_disk_queue table is
sub-optimal since it means we store it once per message reference
rather than just once per message. That's a small price to pay though
for the cleaner interaction between the disk_queue and msg_store, and
the opportunity to remove the notion of message attributes from
msg_store altogether.
Populating the new field in rabbit_disk_queue is straightforward in
most places except disk_queue:tx_commit. That used to just be given
{MsgId, IsDelivered} tuples, so I had to change the API to {MsgId,
IsDelivered, IsPersistent} tuples.
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_disk_queue.erl | 93 | ||||
-rw-r--r-- | src/rabbit_mixed_queue.erl | 11 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 10 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 17 |
5 files changed, 65 insertions, 68 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 095044e7..bebaee98 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -65,7 +65,7 @@ -record(basic_message, {exchange_name, routing_key, content, guid, is_persistent}). --record(dq_msg_loc, {queue_and_seq_id, is_delivered, msg_id}). +-record(dq_msg_loc, {queue_and_seq_id, is_delivered, is_persistent, msg_id}). -record(ssl_socket, {tcp, ssl}). -record(delivery, {mandatory, immediate, txn, sender, message}). diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index af5d808a..0f69f83b 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -62,7 +62,8 @@ -define(SHUTDOWN_MESSAGE, #dq_msg_loc { queue_and_seq_id = ?SHUTDOWN_MESSAGE_KEY, msg_id = infinity_and_beyond, - is_delivered = never + is_delivered = never, + is_persistent = true }). -define(SYNC_INTERVAL, 5). %% milliseconds @@ -98,7 +99,8 @@ -spec(prefetch/1 :: (queue_name()) -> 'ok'). -spec(ack/2 :: (queue_name(), [ack_tag()]) -> 'ok'). -spec(tx_publish/1 :: (message()) -> 'ok'). --spec(tx_commit/3 :: (queue_name(), [{msg_id(), boolean()}], [ack_tag()]) -> +-spec(tx_commit/3 :: (queue_name(), [{msg_id(), boolean(), boolean()}], + [ack_tag()]) -> 'ok'). -spec(tx_rollback/1 :: ([msg_id()]) -> 'ok'). -spec(requeue/2 :: (queue_name(), [{ack_tag(), boolean()}]) -> 'ok'). @@ -198,9 +200,10 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> ok = detect_shutdown_state_and_adjust_delivered_flags(), - Store = prune(rabbit_msg_store:init(base_directory(), - FileSizeLimit, ReadFileHandlesLimit, - fun msg_ref_gen/1, msg_ref_gen_init())), + Store = rabbit_msg_store:init(base_directory(), + FileSizeLimit, ReadFileHandlesLimit, + fun msg_ref_gen/1, msg_ref_gen_init()), + ok = prune(Store), Sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]), ok = extract_sequence_numbers(Sequences), @@ -449,7 +452,8 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, State = #dqstate { store = Store, on_sync_txns = Txns }) -> TxnDetails = {Q, PubMsgIds, AckSeqIds, From}, case rabbit_msg_store:needs_sync( - [MsgId || {MsgId, _IsDelivered} <- PubMsgIds], Store) of + [MsgId || {MsgId, _IsDelivered, _IsPersistent} <- PubMsgIds], + Store) of true -> Txns1 = [TxnDetails | Txns], State #dqstate { on_sync_txns = Txns1 }; false -> internal_do_tx_commit(TxnDetails, State) @@ -464,12 +468,13 @@ internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From}, ok = mnesia:write_lock_table(rabbit_disk_queue), {ok, WriteSeqId1} = lists:foldl( - fun ({MsgId, IsDelivered}, {ok, SeqId}) -> + fun ({MsgId, IsDelivered, IsPersistent}, {ok, SeqId}) -> {mnesia:write( rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, SeqId}, msg_id = MsgId, - is_delivered = IsDelivered + is_delivered = IsDelivered, + is_persistent = IsPersistent }, write), SeqId + 1} end, {ok, InitWriteSeqId}, PubMsgIds), @@ -483,7 +488,8 @@ internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From}, gen_server2:reply(From, ok), State1. -internal_publish(Q, Message = #basic_message { guid = MsgId }, +internal_publish(Q, Message = #basic_message { guid = MsgId, + is_persistent = IsPersistent }, IsDelivered, State) -> {ok, State1 = #dqstate { sequences = Sequences }} = internal_tx_publish(Message, State), @@ -491,7 +497,8 @@ internal_publish(Q, Message = #basic_message { guid = MsgId }, ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId}, msg_id = MsgId, - is_delivered = IsDelivered}), + is_delivered = IsDelivered, + is_persistent = IsPersistent }), true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId + 1}), {ok, {MsgId, WriteSeqId}, State1}. @@ -694,54 +701,42 @@ msg_ref_gen_init() -> mnesia:dirty_first(rabbit_disk_queue). msg_ref_gen('$end_of_table') -> finished; msg_ref_gen(Key) -> - [Obj] = mnesia:dirty_read(rabbit_disk_queue, Key), - {Obj #dq_msg_loc.msg_id, 1, mnesia:dirty_next(rabbit_disk_queue, Key)}. + [#dq_msg_loc { msg_id = MsgId, is_persistent = IsPersistent }] = + mnesia:dirty_read(rabbit_disk_queue, Key), + NextKey = mnesia:dirty_next(rabbit_disk_queue, Key), + {MsgId, case IsPersistent of true -> 1; false -> 0 end, NextKey}. -prune_flush_batch(DeleteAcc, RemoveAcc, Store) -> +prune_flush_batch(DeleteAcc) -> lists:foldl(fun (Key, ok) -> mnesia:dirty_delete(rabbit_disk_queue, Key) - end, ok, DeleteAcc), - rabbit_msg_store:remove(RemoveAcc, Store). + end, ok, DeleteAcc). prune(Store) -> - prune(Store, mnesia:dirty_first(rabbit_disk_queue), [], [], 0). + prune(Store, mnesia:dirty_first(rabbit_disk_queue), [], 0). -prune(Store, '$end_of_table', _DeleteAcc, _RemoveAcc, 0) -> - Store; -prune(Store, '$end_of_table', DeleteAcc, RemoveAcc, _Len) -> - prune_flush_batch(DeleteAcc, RemoveAcc, Store); -prune(Store, Key, DeleteAcc, RemoveAcc, Len) -> +prune(_Store, '$end_of_table', DeleteAcc, _Len) -> + prune_flush_batch(DeleteAcc); +prune(Store, Key, DeleteAcc, Len) -> [#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }] = mnesia:dirty_read(rabbit_disk_queue, Key), - {DeleteAcc1, RemoveAcc1, Len1} = - case rabbit_msg_store:attrs(MsgId, Store) of - not_found -> - %% msg hasn't been found on disk, delete it - {[{Q, SeqId} | DeleteAcc], RemoveAcc, Len + 1}; - true -> - %% msg is persistent, keep it - {DeleteAcc, RemoveAcc, Len}; - false -> - %% msg is not persistent, delete it - {[{Q, SeqId} | DeleteAcc], [MsgId | RemoveAcc], Len + 1} - end, - {Store1, Key1, DeleteAcc2, RemoveAcc2, Len2} = - if - Len1 >= ?BATCH_SIZE -> - %% We have no way of knowing how flushing the batch - %% will affect ordering of records within the table, - %% so have no choice but to start again. Although this - %% will make recovery slower for large queues, we - %% guarantee we can start up in constant memory - Store2 = prune_flush_batch(DeleteAcc1, RemoveAcc1, - Store), - Key2 = mnesia:dirty_first(rabbit_disk_queue), - {Store2, Key2, [], [], 0}; - true -> - Key2 = mnesia:dirty_next(rabbit_disk_queue, Key), - {Store, Key2, DeleteAcc1, RemoveAcc1, Len1} + {DeleteAcc1, Len1} = + case rabbit_msg_store:contains(MsgId, Store) of + true -> {DeleteAcc, Len}; + false -> {[{Q, SeqId} | DeleteAcc], Len + 1} end, - prune(Store1, Key1, DeleteAcc2, RemoveAcc2, Len2). + if Len1 >= ?BATCH_SIZE -> + %% We have no way of knowing how flushing the batch will + %% affect ordering of records within the table, so have no + %% choice but to start again. Although this will make + %% recovery slower for large queues, we guarantee we can + %% start up in constant memory + ok = prune_flush_batch(DeleteAcc1), + NextKey = mnesia:dirty_first(rabbit_disk_queue), + prune(Store, NextKey, [], 0); + true -> + NextKey = mnesia:dirty_next(rabbit_disk_queue, Key), + prune(Store, NextKey, DeleteAcc1, Len1) + end. extract_sequence_numbers(Sequences) -> true = diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 74e1da2b..c278bac8 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -235,7 +235,7 @@ tx_commit(Publishes, MsgsWithAcks, State = #mqstate { mode = Mode, queue = Q, msg_buf = MsgBuf, is_durable = IsDurable, length = Length }) -> PersistentPubs = - [{MsgId, false} || + [{MsgId, false, IsPersistent} || #basic_message { guid = MsgId, is_persistent = IsPersistent } <- Publishes, on_disk(Mode, IsDurable, IsPersistent)], @@ -534,12 +534,13 @@ send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount, Commit, Ack, inc_queue_length(MsgBuf, Count)) end. -republish_message_to_disk_queue(IsDurable, Q, Queue, PublishCount, RequeueCount, - Commit, Ack, MsgBuf, Msg = - #basic_message { guid = MsgId }, IsDelivered) -> +republish_message_to_disk_queue( + IsDurable, Q, Queue, PublishCount, RequeueCount, Commit, Ack, MsgBuf, + Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, + IsDelivered) -> {Commit1, Ack1} = flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack), ok = rabbit_disk_queue:tx_publish(Msg), - Commit2 = [{MsgId, IsDelivered} | Commit1], + Commit2 = [{MsgId, IsDelivered, IsPersistent} | Commit1], {PublishCount1, Commit3, Ack2} = case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of true -> ok = flush_messages_to_disk_queue(Q, Commit2, Ack1), diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index aee501c3..b752b9f6 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -31,7 +31,7 @@ -module(rabbit_msg_store). --export([init/5, write/4, read/2, attrs/2, remove/2, release/2, +-export([init/5, write/4, read/2, contains/2, remove/2, release/2, needs_sync/2, sync/1, cleanup/1]). %%---------------------------------------------------------------------------- @@ -99,7 +99,7 @@ A) -> msstate()). -spec(write/4 :: (msg_id(), msg(), msg_attrs(), msstate()) -> msstate()). -spec(read/2 :: (msg_id(), msstate()) -> {msg(), msstate()} | 'not_found'). --spec(attrs/2 :: (msg_id(), msstate()) -> msg_attrs() | 'not_found'). +-spec(contains/2 :: (msg_id(), msstate()) -> boolean()). -spec(remove/2 :: ([msg_id()], msstate()) -> msstate()). -spec(release/2 :: ([msg_id()], msstate()) -> msstate()). -spec(needs_sync/2 :: ([msg_id()], msstate()) -> boolean()). @@ -354,10 +354,10 @@ read(MsgId, State) -> end end. -attrs(MsgId, State) -> +contains(MsgId, State) -> case index_lookup(MsgId, State) of - not_found -> not_found; - #msg_location { attrs = Attrs } -> Attrs + not_found -> false; + #msg_location {} -> true end. remove(MsgIds, State = #msstate { current_file = CurFile }) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1e50696a..9d9e60ba 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -862,9 +862,10 @@ rdq_match_messages(#basic_message { guid = MsgId, content = #content { payload_f #basic_message { guid = MsgId, content = #content { payload_fragments_rev = MsgBody }}) -> ok. -commit_list(List, MsgCount) -> - lists:zip([term_to_binary(MsgId) || MsgId <- List], - lists:duplicate(MsgCount, false)). +commit_list(List, MsgCount, IsPersistent) -> + lists:zip3([term_to_binary(MsgId) || MsgId <- List], + lists:duplicate(MsgCount, false), + lists:duplicate(MsgCount, IsPersistent)). rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> Startup = rdq_virgin(), @@ -872,7 +873,7 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> QCount = length(Qs), Msg = <<0:(8*MsgSizeBytes)>>, List = lists:seq(1, MsgCount), - CommitList = commit_list(List, MsgCount), + CommitList = commit_list(List, MsgCount, false), {Publish, ok} = timer:tc(?MODULE, rdq_time_commands, [[fun() -> [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) @@ -911,7 +912,7 @@ rdq_stress_gc(MsgCount) -> MsgSizeBytes = 256*1024, Msg = <<0:(8*MsgSizeBytes)>>, % 256KB List = lists:seq(1, MsgCount), - CommitList = commit_list(List, MsgCount), + CommitList = commit_list(List, MsgCount, false), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- List], rabbit_disk_queue:tx_commit(q, CommitList, []), StartChunk = round(MsgCount / 20), % 5% @@ -954,7 +955,7 @@ rdq_test_startup_with_queue_gaps() -> Total = 1000, Half = round(Total/2), All = lists:seq(1,Total), - CommitAll = commit_list(All, Total), + CommitAll = commit_list(All, Total, true), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, true)) || N <- All], rabbit_disk_queue:tx_commit(q, CommitAll, []), io:format("Publish done~n", []), @@ -1011,7 +1012,7 @@ rdq_test_redeliver() -> Total = 1000, Half = round(Total/2), All = lists:seq(1,Total), - CommitAll = commit_list(All, Total), + CommitAll = commit_list(All, Total, false), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All], rabbit_disk_queue:tx_commit(q, CommitAll, []), io:format("Publish done~n", []), @@ -1064,7 +1065,7 @@ rdq_test_purge() -> Total = 1000, Half = round(Total/2), All = lists:seq(1,Total), - CommitAll = commit_list(All, Total), + CommitAll = commit_list(All, Total, false), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All], rabbit_disk_queue:tx_commit(q, CommitAll, []), io:format("Publish done~n", []), |